GoogleAppEngine(GAE)でgoroutineを使う
GoogleAppEngineでgoroutineを使う
以下に記載されている通り、GoogleAppEngineでは並列処理はサポートしていない。
Go Runtime Environment | App Engine standard environment for Go | Google Cloud
App Engine の Go ランタイム環境は、goroutine を完全にサポートしていますが、並列実行はサポートしていません。goroutine は単一のオペレーティング システム スレッドにスケジュールされます。このシングルスレッド制限は、今後のバージョンで解除される予定です。特定のインスタンスで複数のリクエストを同時に処理することができます。これは、1 つのリクエストで Cloud Datastore API の呼び出しを待っている間に、同じインスタンスで別のリクエストを処理できることを意味します。
しかし、並行処理はできる
これは、同時に複数の処理を実行すること(並列処理)はできないが、APIのレスポンスなどを待つ間に別の処理をすること(並行処理)はできるということを意味する
【脱線】並行性と並列性の違い
「Go言語による並行処理」ではこの違いを以下の一文で表している
並行性はコードの性質を指し、並列性は動作しているプログラムの性質を指します。
ピンと来ない表現・・・
検証
検証用のコード
以下のような3つのHandlerを作成して、それぞれの挙動を確認する - testHandler - testGoroutineHandler - testGoroutineHandler2
挙動の違いが分かりやすいように、time.Sleep(3 * time.Second)を入れている
このコードは以下の「複数のgoroutineの結果の取得」に書いたものを使っている go言語の並行処理 - ludwig125のブログ
package main import ( "fmt" "net/http" "sync" "time" "google.golang.org/appengine" // Required external App Engine library "google.golang.org/appengine/log" "google.golang.org/appengine/urlfetch" // 外部にhttpするため(http.GetはGAEでは使えない) ) func main() { http.HandleFunc("/test", testHandler) http.HandleFunc("/test_goroutine", testGoroutineHandler) http.HandleFunc("/test_goroutine2", testGoroutineHandler2) appengine.Main() // Starts the server to receive requests } func testHandler(w http.ResponseWriter, r *http.Request) { // GAE log ctx := appengine.NewContext(r) client := urlfetch.Client(ctx) urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"} checkStatus := func(urls ...string) []*http.Response { var response []*http.Response for _, url := range urls { log.Infof(ctx, "client.Get '%s'", url) resp, err := client.Get(url) if err != nil { log.Errorf(ctx, "%v", err) continue } response = append(response, resp) // 関数の実行にかかる時間を遅らせる log.Infof(ctx, "sleep 3 sec") time.Sleep(3 * time.Second) } return response } for _, response := range checkStatus(urls...) { log.Infof(ctx, "checkStatus: status: %s", response.Status) } } func testGoroutineHandler(w http.ResponseWriter, r *http.Request) { // GAE log ctx := appengine.NewContext(r) client := urlfetch.Client(ctx) type Result struct { Error error Response *http.Response } checkStatus := func( done <-chan interface{}, urls ...string, ) <-chan Result { resultChan := make(chan Result) go func() { defer close(resultChan) for _, url := range urls { log.Infof(ctx, "client.Get '%s'", url) resp, err := client.Get(url) // 関数の実行にかかる時間を遅らせる log.Infof(ctx, "sleep 3 sec") time.Sleep(3 * time.Second) result := Result{Error: err, Response: resp} select { case <-done: return case resultChan <- result: } } }() return resultChan } done := make(chan interface{}) defer close(done) urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"} for result := range checkStatus(done, urls...) { if result.Error != nil { log.Infof(ctx, "error: %v", result.Error) continue } log.Infof(ctx, "checkStatus: status: %s", result.Response.Status) } } urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"} for result := range checkStatus(done, urls...) { if result.Error != nil { log.Infof(ctx, "error: %v", result.Error) continue } //log.Infof(ctx, "%s", formatStatus(result.Response.Status)) log.Infof(ctx, "checkStatus: status: %s", result.Response.Status) } } func testGoroutineHandler2(w http.ResponseWriter, r *http.Request) { // GAE log ctx := appengine.NewContext(r) client := urlfetch.Client(ctx) type Result struct { Error error Response *http.Response } checkStatus := func( done <-chan interface{}, urls []string, ) <-chan Result { resultChan := make(chan Result, len(urls)) wg := new(sync.WaitGroup) defer close(resultChan) for _, url := range urls { wg.Add(1) go func(url string) { defer wg.Done() log.Infof(ctx, "client.Get '%s'", url) resp, err := client.Get(url) // 関数の実行にかかる時間を遅らせる log.Infof(ctx, "sleep 3 sec") time.Sleep(3 * time.Second) select { case <-done: return case resultChan <- Result{Error: err, Response: resp}: } }(url) } wg.Wait() return resultChan } done := make(chan interface{}) defer close(done) urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"} for result := range checkStatus(done, urls) { if result.Error != nil { log.Infof(ctx, "error: %v", result.Error) continue } log.Infof(ctx, "checkStatus: status: %s", result.Response.Status) } }
実行して結果を確認
GoogleAppEngineの左側からトレースを選ぶと詳細がみられる
またトレース画面の右側のログの「表示」をクリックするとログがみられる
1.test(testHandler)
トレース - 3つのURLの処理を3秒おきに実行していることがわかる
ログ
2.test_goroutine(testGoroutineHandler)
- goroutineを使っているが、URLは3つを順番に処理している
- 3つそれぞれで3秒ごとのSleepを入れているため、全体の実行時間は9秒ちょっとになっている
トレース
ログ
3.test_goroutine2(testGoroutineHandler2)
- (ほぼ)同時に3つのgoroutineを実行している
- 「並列実行はサポートしていません」と公式に書いてあるので、おそらく全く同時ではないはずだが、これ見るとほぼ同時に見える。。
- ともかく、並行で3つの処理を行うようにしたことで、全体としての処理時間が3秒ちょっとになった
トレース
ログ