go言語の並行処理
概要
「Go言語による並行処理」を読んだのでメモ ※この本に書いてないことも以下では取り上げている
goroutineとsync
goroutineの書き方
一番単純なgoroutine(これは期待した通り動かない)
package main import "fmt" func main() { hello := func() { fmt.Println("hello") } go hello() }
無名関数helloを定義して、「go 関数()」で実行する しかしこれは実行しても何も出力されない
$go run ex1.go $
その理由は、main goroutineとhello goroutineはそれぞれ別個に動くので、 mainが終わる前に子の処理のhelloを待たないため。
そこで、意図したとおりにするためには、syncで待ち合わせをすることが一番簡単な実現方法となる
syncで待ち合わせをする(期待通りの挙動になる)
syncパッケージを使って待ち合わせをする場合は以下のようにする
- wg.Add(待ちたい処理の数)を定義
- hello goroutineが終わる前に、このgoroutineの終了を伝えるwg.Doneをdeferで実行
- hello goroutineが終わるまで、wg.Wait()で待つ
$cat ex2.go package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(1) hello := func() { defer wg.Done() fmt.Println("hello") } go hello() wg.Wait() }
これを実行すると以下の通り出力される
$go run ex2.go hello $
syncで待ち合わせをする(helloを定義しない)
上の例はより簡単に、以下のようにも書ける
- かっこの最後に 「 }()」がついていることに注意
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() fmt.Println("hello") }() wg.Wait() }
syncで待ち合わせをする(helloを通常の関数として定義する)
helloを通常の関数として定義する場合には以下のように、WaitGroupをポインタとして渡すことが必要になる
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(1) go hello(&wg) wg.Wait() } func hello(wg *sync.WaitGroup) { defer wg.Done() fmt.Println("hello") }
syncで複数の待ち合わせをする
waitNum := 3 と定義して、その回数分だけ待つ
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup waitNum := 3 wg.Add(waitNum) hello := func() { defer wg.Done() fmt.Println("hello") } for i := 0; i < waitNum; i++ { go hello() } wg.Wait() }
結果
hello hello hello
goroutineに引数を渡す
通常の関数と同様に、goroutineにも引数を渡せる
$cat ex3-2.go package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup waitNum := 3 wg.Add(waitNum) hello := func(i int) { defer wg.Done() fmt.Printf("hello %d\n", i) } for i := 0; i < waitNum; i++ { go hello(i) } wg.Wait() }
結果
$go run ex3-2.go hello 2 hello 0 hello 1
channel
最も簡単なchannel
一番簡単な例
- channelは複数のgoroutine間でデータを共有するためにあるので、ふつうこんな使い方はしない
package main import "fmt" func main() { ch := make(chan int, 1) // channelに5を送信 ch <- 5 // channelから整数値を受信 i := <-ch fmt.Println(i) }
実行結果
$go run chan.go 5
goroutineでのchannel
goroutineでchanelをやり取りする例
package main import "fmt" func main() { ch := make(chan int) go func() { // channelに5を送信 ch <- 5 }() // channelから5を受信 fmt.Println(<-ch) }
実行結果
5
複数のchannelを送受信することもできる
package main import "fmt" func main() { ch := make(chan int) n := 3 go func() { for i := 0; i < n; i++ { ch <- i } }() for i := 0; i < n; i++ { fmt.Println(<-ch) } }
実行結果
0 1 2
for rangeを使ったchannelの受信
for rangeを使ってchannelを受信する書き方もできる
ただし、for rangeを使って受信する場合、channelの終了がわからず永遠に待ち続けるので、goroutine側がこれ以上channelに送信しなくなった時点でchannelをcloseすることが必要
package main import "fmt" func main() { ch := make(chan int) n := 3 go func() { defer close(ch) // channelの終了を送信 for i := 0; i < n; i++ { ch <- i } }() for i := range ch { fmt.Println(i) } }
実行結果
0 1 2
deferを忘れると、待ち続けてdeadlockになる
deferをつけない場合の実行結果
0 1 2 fatal error: all goroutines are asleep - deadlock!
channelのcloseを確認する
channelからは、2番目の返り値としてchannelがclose済みかそうでないかを表すbool値が取得できる 慣習的にこの2番目の返り値はokとする
上の例で言えば、chから得られた値がi、chがclose済みかどうかがok(closeされていればfalse)となる。このokはとっても取らなくてもいい
i, ok := <-ch
参考 - A Tour of Go
上の例を、okがfalseとなるまでchの中身を出力し続けるようにした場合、以下のようになる
- ※上のfor rangeを使った書き方より冗長だしミスると無限ループになるからめったに使う必要ないと思う
package main import "fmt" func main() { ch := make(chan int) n := 3 go func() { defer close(ch) // channelの終了を送信 for i := 0; i < n; i++ { ch <- i } }() for { i, ok := <-ch if !ok { fmt.Println("ch closed") return // このreturnを忘れると無限ループになる } fmt.Println(i) } }
実行結果
0 1 2 ch closed
bufferなしchannel
bufferなしchannelとbufferつきchannelの挙動の違いについてシンプルにr例示している資料が意外とすぐに見つからなかった
まずbufferなしchannelの例
package main import ( "fmt" "time" ) func main() { ch := make(chan int) go func() { defer close(ch) defer fmt.Println("Producer Done") for i := 0; i < 5; i++ { ch <- i fmt.Printf("Sending: %d\n", i) } }() for i := range ch { fmt.Printf("Received %v\n", i) time.Sleep(1 * time.Second) } }
実行結果
Sending: 0 Received 0 Received 1 Sending: 1 Received 2 Sending: 2 Received 3 Sending: 3 Received 4 Sending: 4 Producer Done
- 1秒おきにSendingとReceivedが交互に出力されている
channelが満杯のときはchannelへの書き込みは(読み込みされるまで)ブロックされるので、bufferなしchannelは毎回読み込みをした後で追加の書き込みをしている
bufferつきchannel
bufferつきchannelは一度に送信できる件数を指定できる
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 4) go func() { defer close(ch) defer fmt.Println("Producer Done") for i := 0; i < 5; i++ { ch <- i fmt.Printf("Sending: %d\n", i) } }() for i := range ch { fmt.Printf("Received %v\n", i) time.Sleep(1 * time.Second) } }
実行結果
Sending: 0 Sending: 1 Sending: 2 Sending: 3 Sending: 4 Producer Done Received 0 Received 1 Received 2 Received 3 Received 4
- buffer4に指定した場合、channelに対して先に4つ書き込みが終了し、あとからReceivedされていることがわかる
- このように、bufferつきchannelは読み込みを待つことなく何件まで書き込めるかを指定できる
select
selectは複数のcaseに対して、それぞれの条件が等しく選択されるように、疑似乱数を使って配分している。
例えば、以下のようにc1, c2という2つのチャネルから合計1000回読み込むとする。
- c1からのチャネルから読み込んだらc1Countを増やす
- c2からのチャネルから読み込んだらc2Countを増やす
package main import ( "fmt" ) func main() { c1 := make(chan interface{}) close(c1) c2 := make(chan interface{}) close(c2) var c1Count, c2Count int for i := 1000; i >= 0; i-- { select { case <-c1: c1Count++ case <-c2: c2Count++ } } fmt.Printf("c1Count: %d, c2Count: %d.\n", c1Count, c2Count) }
このコードを何回も実行した結果を見てみると、c1, c2がほぼ同じ回数ずつカウントされていることがわかる
$go run select2.go c1Count: 499, c2Count: 502. $go run select2.go c1Count: 498, c2Count: 503. $go run select2.go c1Count: 520, c2Count: 481. $go run select2.go c1Count: 513, c2Count: 488. $go run select2.go c1Count: 505, c2Count: 496. $go run select2.go c1Count: 502, c2Count: 499. $go run select2.go c1Count: 509, c2Count: 492. $
for-selectでdefault処理
以下のように、チャネルを待っている間やりたい処理をdefaultに書くことでgoroutineの結果を待っている間他の処理ができる
for { select { case <-チャネル: <チャネルを受け取ったあとの処理> case <-チャネル2: <チャネル2を受け取ったあとの処理> default: } <チャネル1とチャネル2を受け取る前に行いたい処理> }
for で無限ループをしながらselectでdoneが来るのを待ち続ける例
goroutineが5秒後にdoneにcloseを送るので、処理が終わる
それまではWaitを繰り返す
package main import ( "fmt" "time" ) func main() { done := make(chan interface{}) go func() { defer close(done) fmt.Println("Goroutine start") time.Sleep(5 * time.Second) fmt.Println("Goroutine end. send done") }() i := 0 for { select { case <-done: fmt.Println("Receive done") return default: } i++ fmt.Printf("Wait %d\n", i) time.Sleep(1 * time.Second) } }
これを実行すると以下のようになる
$go run sample.go Wait 1 Goroutine start Wait 2 Wait 3 Wait 4 Wait 5 Goroutine end. send done Receive done
ちなみに、以下のようにdefautlとselectの終端の}の間に処理を入れても同様の結果になるが、この処理が往々にして長くなることがあってselectの終端がわかりにくくなるので、自分はあまり好んで使う人はいないようだ
default: <チャネル1とチャネル2を受け取る前に行いたい処理> }
timeout
Go by Exampleに書いてあるtimeoutの方法
Go by Example: Timeouts に書いてある方法
time.After(タイムアウトする時間) を設定してselect文で受け取ることで、時間がかかるgoroutineのタイムアウト処理ができる
以下の例では、 - 2秒かかるgoroutineの結果c1が1秒のtimeout制限に引っかかってtimeout 1が出力され、 - 同じく2秒かかるgoroutineの結果c2が3秒のtimeout制限以内に終わるので、result 2が出力される
package main import ( "fmt" "time" ) func main() { c1 := make(chan string, 1) go func() { time.Sleep(2 * time.Second) c1 <- "result 1" }() select { case res := <-c1: fmt.Println(res) case <-time.After(1 * time.Second): fmt.Println("timeout 1") } c2 := make(chan string, 1) go func() { time.Sleep(2 * time.Second) c2 <- "result 2" }() select { case res := <-c2: fmt.Println(res) case <-time.After(3 * time.Second): fmt.Println("timeout 2") } }
実行結果
timeout 1 result 2
上のtimeout方法の問題点
上の方法には一つ問題があって、timeout 1と出てもgoroutineに切り出した処理は自動で止まらないということがある
ためしに以下のようにc1で受け取っていた部分を以下のようにtaskというgoroutineにして、task関数の中では0から999999まで数字を出力し続けるようにしてみる - また、最初と最後の時間差をとって、処理時間を計測してみる (結果は長くなるので注意)
package main import ( "fmt" "log" "time" ) func main() { start := time.Now() select { case res := <-task(): fmt.Println(res) case <-time.After(1 * time.Second): fmt.Println("timeout 1") } log.Printf("Total time: %fs", time.Since(start).Seconds()) } func task() <-chan string { res := make(chan string) go func() { defer close(res) for i := 0; i < 1000000; i++ { fmt.Println(i) time.Sleep(1 * time.Nanosecond) } res <- "done!" }() return res }
実行結果
0 1 2 (省略) 475135 475136 475137 timeout 1 2019/09/29 06:11:19 Total time: 12.296678s
おかしなことが起きた! 実行結果はマシンの性能にもよると思うが、「timeout 1」まで出した後でtimeoutが出力されて、「 Total time: 12.296678s」となっている。
しかも恐ろしいことに2回目に実行すると、今度は「timeout 1」すら出ずに12秒かかっている。
489587 489588 489589 2019/09/29 06:13:37 Total time: 12.209647s
3回目もまた結果が変わる
489490 489491 489492 2019/09/29 06:14:39 Total time: 12.120974s
timeoutとなっても、すでに起動してfmt.Println(i) を出力し続けるgoroutineは勝手に暴走して、それが終わるタイミングは全然予測できていない
これをtimeout時間の通り1秒で終了させたい場合は以下のようにcontextやdoneチャネルを使うといい。
timeoutでcontext cancelを行う場合
上のコードをcontextを用いて書き直したものが以下となる。
timeout時刻になるとcontextのcancel処理が飛んで、1秒で処理が終わっていることがわかる
package main import ( "context" "fmt" "log" "time" ) func main() { start := time.Now() ctx, cancel := context.WithCancel(context.Background()) defer cancel() res := task(ctx) select { case <-res: fmt.Println(res) case <-time.After(1 * time.Second): log.Println("timeout after 1s") cancel() log.Println(<-res) } log.Printf("Total time: %fs", time.Since(start).Seconds()) } func task(ctx context.Context) <-chan string { res := make(chan string) go func() { defer close(res) for i := 0; i < 1000000; i++ { select { case <-ctx.Done(): log.Printf("context cancelled. count: %d", i) res <- fmt.Sprintf("error: %s", ctx.Err()) return default: } fmt.Println(i) time.Sleep(1 * time.Nanosecond) } res <- "done!" }() return res }
実行結果
(省略) 38503 38504 38505 38506 38507 2019/09/29 06:30:35 timeout after 1s 2019/09/29 06:30:35 context cancelled. count: 37841 2019/09/29 06:30:35 error: context canceled 2019/09/29 06:30:35 Total time: 1.000472s
contextの使い方は公式は以下。ほかにもネット上にたくさんあるので、以下で簡単な説明だけ記載しておく。 Go Concurrency Patterns: Context - The Go Blog
簡単に説明
- コードの最初でcancel 機能付きのcontextを宣言する
- 「defer cancel()」によって、プログラム終了時に確実にすべてのcontextがキャンセルされるようにする
ctx, cancel := context.WithCancel(context.Background()) defer cancel()
- task関数にこのcontextを渡して(第一引数として渡すのがセオリー)、for 文の1ループごとに、「ctx.Done()」が来ていないかチェックしている
- このプログラムでは 「ctx.Done()」を受け取ったらlogに「context cancelled. count」とその時のループ番号を出力している
- さらにresにエラー原因として「 ctx.Err()」を返してreturnでgoroutineを終える
- このreturnがないとDoneを受け取ってもgoroutineが終わらないので必要
for i := 0; i < 1000000; i++ { select { case <-ctx.Done(): log.Printf("context cancelled. count: %d", i) res <- fmt.Sprintf("error: %s", ctx.Err()) return default: } fmt.Println(i) time.Sleep(1 * time.Nanosecond) }
- 呼び出し側のmain内では、「res := task(ctx)」としてtaskの戻り値(channel)をいったんres変数に定義
- time.Afterの制限時間内に処理が終われば、「case <-res」に入る
- time.Afterの制限時間を過ぎたら、「timeout after 1s」をlogに表示させて、contextのcancel関数を呼び出す(重要)
- 「log.Println(<-res)」には、task関数内で詰めたctx.Err()の内容が出力される「error: context canceled」
- 「<-res」とすることで、cancelをした後のtask関数が返すエラーをきちんと受け取ってから終わるように待ち合わせをしている
res := task(ctx) select { case <-res: fmt.Println(res) case <-time.After(1 * time.Second): log.Println("timeout after 1s") cancel() log.Println(<-res) }
contextを使うことで、goroutineを安全に終了させることができる
pipeline、fan-in、fan-out
go言語のpipeline、fan-in、fan-out - ludwig125のブログ
複数のgoroutineのエラーハンドリングをする
同時並列数を制御する
シグナル処理
GoogleAppEngineでgoroutineを使う
GoogleAppEngineでgoroutineを使った例 ludwig125.hatenablog.com