ludwig125のブログ

頑張りすぎずに頑張る父

GoogleAppEngine(GAE)でgoroutineを使う

GoogleAppEngineでgoroutineを使う

以下に記載されている通り、GoogleAppEngineでは並列処理はサポートしていない。

Go Runtime Environment  |  App Engine standard environment for Go  |  Google Cloud

App Engine の Go ランタイム環境は、goroutine を完全にサポートしていますが、並列実行はサポートしていません。goroutine は単一のオペレーティング システム スレッドにスケジュールされます。このシングルスレッド制限は、今後のバージョンで解除される予定です。特定のインスタンスで複数のリクエストを同時に処理することができます。これは、1 つのリクエストで Cloud Datastore API の呼び出しを待っている間に、同じインスタンスで別のリクエストを処理できることを意味します。

しかし、並行処理はできる

これは、同時に複数の処理を実行すること(並列処理)はできないが、APIのレスポンスなどを待つ間に別の処理をすること(並行処理)はできるということを意味する

【脱線】並行性と並列性の違い

www.oreilly.co.jp

「Go言語による並行処理」ではこの違いを以下の一文で表している

並行性はコードの性質を指し、並列性は動作しているプログラムの性質を指します。

ピンと来ない表現・・・

検証

検証用のコード

以下のような3つのHandlerを作成して、それぞれの挙動を確認する - testHandler - testGoroutineHandler - testGoroutineHandler2

挙動の違いが分かりやすいように、time.Sleep(3 * time.Second)を入れている

このコードは以下の「複数のgoroutineの結果の取得」に書いたものを使っている go言語の並行処理 - ludwig125のブログ

package main                                                                                   

import (
    "fmt"
    "net/http"
    "sync"
    "time"

    "google.golang.org/appengine" // Required external App Engine library
    "google.golang.org/appengine/log"
    "google.golang.org/appengine/urlfetch" // 外部にhttpするため(http.GetはGAEでは使えない)
)

func main() {
    http.HandleFunc("/test", testHandler)
    http.HandleFunc("/test_goroutine", testGoroutineHandler)
    http.HandleFunc("/test_goroutine2", testGoroutineHandler2)
    appengine.Main() // Starts the server to receive requests
}

func testHandler(w http.ResponseWriter, r *http.Request) {
    // GAE log
    ctx := appengine.NewContext(r)
    client := urlfetch.Client(ctx)

    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    checkStatus := func(urls ...string) []*http.Response {
        var response []*http.Response
        for _, url := range urls {
            log.Infof(ctx, "client.Get '%s'", url)
            resp, err := client.Get(url)
            if err != nil {
                log.Errorf(ctx, "%v", err)
                continue
            }
            response = append(response, resp)
            // 関数の実行にかかる時間を遅らせる
            log.Infof(ctx, "sleep 3 sec")
            time.Sleep(3 * time.Second)
        }
        return response
    }

    for _, response := range checkStatus(urls...) {
        log.Infof(ctx, "checkStatus: status: %s", response.Status)
    }                                                                                                                                                                 
}

func testGoroutineHandler(w http.ResponseWriter, r *http.Request) {
    // GAE log
    ctx := appengine.NewContext(r)
    client := urlfetch.Client(ctx)

    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(
        done <-chan interface{},
        urls ...string,
    ) <-chan Result {
        resultChan := make(chan Result)
        go func() {
            defer close(resultChan)
            for _, url := range urls {
                log.Infof(ctx, "client.Get '%s'", url)
                resp, err := client.Get(url)
                // 関数の実行にかかる時間を遅らせる
                log.Infof(ctx, "sleep 3 sec")
                time.Sleep(3 * time.Second)
                result := Result{Error: err, Response: resp}
                select {
                case <-done:
                    return
                case resultChan <- result:
                }
            }
        }()
        return resultChan
    }

    done := make(chan interface{})                                                                                             
    defer close(done)

    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
            log.Infof(ctx, "error: %v", result.Error)
            continue
        }
        log.Infof(ctx, "checkStatus: status: %s", result.Response.Status)
    }
}

    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
            log.Infof(ctx, "error: %v", result.Error)
            continue
        }
        //log.Infof(ctx, "%s", formatStatus(result.Response.Status))
        log.Infof(ctx, "checkStatus: status: %s", result.Response.Status)
    }
}

func testGoroutineHandler2(w http.ResponseWriter, r *http.Request) {
    // GAE log
    ctx := appengine.NewContext(r)
    client := urlfetch.Client(ctx)

    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(
        done <-chan interface{},
        urls []string,
    ) <-chan Result {
        resultChan := make(chan Result, len(urls))
        wg := new(sync.WaitGroup)

        defer close(resultChan)
        for _, url := range urls {
            wg.Add(1)
            go func(url string) {
                defer wg.Done()
                log.Infof(ctx, "client.Get '%s'", url)
                resp, err := client.Get(url)
                // 関数の実行にかかる時間を遅らせる
                log.Infof(ctx, "sleep 3 sec")
                time.Sleep(3 * time.Second)
                select {
                case <-done:
                    return
                case resultChan <- Result{Error: err, Response: resp}:
                }
            }(url)
        }
        wg.Wait()
        return resultChan                                                                                                             
    }

    done := make(chan interface{})
    defer close(done)
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    for result := range checkStatus(done, urls) {
        if result.Error != nil {
            log.Infof(ctx, "error: %v", result.Error)
            continue
        }
        log.Infof(ctx, "checkStatus: status: %s", result.Response.Status)
    }
}

実行して結果を確認

GoogleAppEngineの左側からトレースを選ぶと詳細がみられる

f:id:ludwig125:20190513065411p:plain

またトレース画面の右側のログの「表示」をクリックするとログがみられる

1.test(testHandler)

トレース f:id:ludwig125:20190513065517p:plain - 3つのURLの処理を3秒おきに実行していることがわかる

ログ f:id:ludwig125:20190513065641p:plain

2.test_goroutine(testGoroutineHandler)

  • goroutineを使っているが、URLは3つを順番に処理している
  • 3つそれぞれで3秒ごとのSleepを入れているため、全体の実行時間は9秒ちょっとになっている

トレース

f:id:ludwig125:20190513065853p:plain

ログ

f:id:ludwig125:20190513065945p:plain

3.test_goroutine2(testGoroutineHandler2)

  • (ほぼ)同時に3つのgoroutineを実行している
  • 「並列実行はサポートしていません」と公式に書いてあるので、おそらく全く同時ではないはずだが、これ見るとほぼ同時に見える。。
  • ともかく、並行で3つの処理を行うようにしたことで、全体としての処理時間が3秒ちょっとになった

トレース

f:id:ludwig125:20190513070119p:plain

ログ

f:id:ludwig125:20190513070141p:plain