ludwig125のブログ

頑張りすぎずに頑張る父

go言語の並行処理

概要

「Go言語による並行処理」を読んだのでメモ ※この本に書いてないことも以下では取り上げている

github.com

goroutineとsync

goroutineの書き方

一番単純なgoroutine(これは期待した通り動かない)

package main

import "fmt"

func main() {
        hello := func() {
                fmt.Println("hello")
        }
        go hello()
}

無名関数helloを定義して、「go 関数()」で実行する しかしこれは実行しても何も出力されない

$go run ex1.go
$

その理由は、main goroutineとhello goroutineはそれぞれ別個に動くので、 mainが終わる前に子の処理のhelloを待たないため。

そこで、意図したとおりにするためには、syncで待ち合わせをすることが一番簡単な実現方法となる

syncで待ち合わせをする(期待通りの挙動になる)

syncパッケージを使って待ち合わせをする場合は以下のようにする

  • wg.Add(待ちたい処理の数)を定義
  • hello goroutineが終わる前に、このgoroutineの終了を伝えるwg.Doneをdeferで実行
  • hello goroutineが終わるまで、wg.Wait()で待つ
$cat ex2.go
package main

import (
        "fmt"
        "sync"
)

func main() {
        var wg sync.WaitGroup

        wg.Add(1)
        hello := func() {
                defer wg.Done()
                fmt.Println("hello")
        }
        go hello()

        wg.Wait()
}

これを実行すると以下の通り出力される

$go run ex2.go                                                          
hello
$  

syncで待ち合わせをする(helloを定義しない)

上の例はより簡単に、以下のようにも書ける

  • かっこの最後に 「 }()」がついていることに注意
package main

import (
        "fmt"
        "sync"
)

func main() {
        var wg sync.WaitGroup

        wg.Add(1)
        go func() {
                defer wg.Done()
                fmt.Println("hello")
        }()

        wg.Wait()
}

syncで待ち合わせをする(helloを通常の関数として定義する)

helloを通常の関数として定義する場合には以下のように、WaitGroupをポインタとして渡すことが必要になる

package main

import (
        "fmt"
        "sync"
)

func main() {
        var wg sync.WaitGroup

        wg.Add(1)
        go hello(&wg)

        wg.Wait()
}

func hello(wg *sync.WaitGroup) {
        defer wg.Done()
        fmt.Println("hello")
}

syncで複数の待ち合わせをする

waitNum := 3 と定義して、その回数分だけ待つ

package main

import (
        "fmt"
        "sync"
)

func main() {
        var wg sync.WaitGroup

        waitNum := 3
        wg.Add(waitNum)
        hello := func() {
                defer wg.Done()
                fmt.Println("hello")
        }

        for i := 0; i < waitNum; i++ {
                go hello()
        }

        wg.Wait()
}

結果

hello
hello
hello

goroutineに引数を渡す

通常の関数と同様に、goroutineにも引数を渡せる

$cat ex3-2.go
package main

import (
        "fmt"
        "sync"
)

func main() {
        var wg sync.WaitGroup

        waitNum := 3
        wg.Add(waitNum)
        hello := func(i int) {
                defer wg.Done()
                fmt.Printf("hello %d\n", i)
        }

        for i := 0; i < waitNum; i++ {
                go hello(i)
        }

        wg.Wait()
}

結果

$go run ex3-2.go             
hello 2
hello 0
hello 1

channel

最も簡単なchannel

一番簡単な例

  • channelは複数のgoroutine間でデータを共有するためにあるので、ふつうこんな使い方はしない
package main

import "fmt"

func main() {
    ch := make(chan int, 1)

    // channelに5を送信
    ch <- 5

    // channelから整数値を受信
    i := <-ch
    fmt.Println(i)                     
}

実行結果

$go run chan.go
5

goroutineでのchannel

goroutineでchanelをやり取りする例

package main

import "fmt"

func main() {
    ch := make(chan int)                   
    go func() {
        // channelに5を送信
        ch <- 5
    }()

    // channelから5を受信
    fmt.Println(<-ch)
}

実行結果

5

複数のchannelを送受信することもできる

package main

import "fmt"

func main() {
    ch := make(chan int)
    n := 3
    go func() {
        for i := 0; i < n; i++ {
            ch <- i
        }
    }()

    for i := 0; i < n; i++ {
        fmt.Println(<-ch)
    }
}

実行結果

0
1
2

for rangeを使ったchannelの受信

for rangeを使ってchannelを受信する書き方もできる

ただし、for rangeを使って受信する場合、channelの終了がわからず永遠に待ち続けるので、goroutine側がこれ以上channelに送信しなくなった時点でchannelをcloseすることが必要

package main

import "fmt"

func main() {
    ch := make(chan int)
    n := 3
    go func() {
        defer close(ch) // channelの終了を送信
        for i := 0; i < n; i++ {
            ch <- i
        }
    }()

    for i := range ch {
        fmt.Println(i)
    }
}

実行結果

0
1
2

deferを忘れると、待ち続けてdeadlockになる

deferをつけない場合の実行結果

0
1
2
fatal error: all goroutines are asleep - deadlock!
channelのcloseを確認する

channelからは、2番目の返り値としてchannelがclose済みかそうでないかを表すbool値が取得できる 慣習的にこの2番目の返り値はokとする

上の例で言えば、chから得られた値がi、chがclose済みかどうかがok(closeされていればfalse)となる。このokはとっても取らなくてもいい i, ok := <-ch

参考 - A Tour of Go

上の例を、okがfalseとなるまでchの中身を出力し続けるようにした場合、以下のようになる

  • ※上のfor rangeを使った書き方より冗長だしミスると無限ループになるからめったに使う必要ないと思う
package main

import "fmt"

func main() {
    ch := make(chan int)
    n := 3
    go func() {
        defer close(ch) // channelの終了を送信
        for i := 0; i < n; i++ {
            ch <- i
        }
    }()

    for {
        i, ok := <-ch
        if !ok {
            fmt.Println("ch closed")
            return // このreturnを忘れると無限ループになる
        }
        fmt.Println(i)
    }
}

実行結果

0
1
2
ch closed

bufferなしchannel

bufferなしchannelとbufferつきchannelの挙動の違いについてシンプルにr例示している資料が意外とすぐに見つからなかった

まずbufferなしchannelの例

package main

import (
        "fmt"
        "time"
)

func main() {
        ch := make(chan int)
        go func() {
                defer close(ch)      
                defer fmt.Println("Producer Done")
                for i := 0; i < 5; i++ {
                        ch <- i
                        fmt.Printf("Sending: %d\n", i)
                }
        }()
    
        for i := range ch {
                fmt.Printf("Received %v\n", i)
                time.Sleep(1 * time.Second)
        }
       
}

実行結果

Sending: 0
Received 0
Received 1
Sending: 1
Received 2
Sending: 2
Received 3
Sending: 3
Received 4
Sending: 4
Producer Done
  • 1秒おきにSendingとReceivedが交互に出力されている

channelが満杯のときはchannelへの書き込みは(読み込みされるまで)ブロックされるので、bufferなしchannelは毎回読み込みをした後で追加の書き込みをしている

bufferつきchannel

bufferつきchannelは一度に送信できる件数を指定できる

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int, 4)
    go func() {
        defer close(ch)
        defer fmt.Println("Producer Done")
        for i := 0; i < 5; i++ {
            ch <- i
            fmt.Printf("Sending: %d\n", i)
        }
    }()

    for i := range ch {
        fmt.Printf("Received %v\n", i)
        time.Sleep(1 * time.Second)
    }

}

実行結果

Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4
Producer Done
Received 0
Received 1
Received 2
Received 3
Received 4
  • buffer4に指定した場合、channelに対して先に4つ書き込みが終了し、あとからReceivedされていることがわかる
  • このように、bufferつきchannelは読み込みを待つことなく何件まで書き込めるかを指定できる

select

selectは複数のcaseに対して、それぞれの条件が等しく選択されるように、疑似乱数を使って配分している。

例えば、以下のようにc1, c2という2つのチャネルから合計1000回読み込むとする。

  • c1からのチャネルから読み込んだらc1Countを増やす
  • c2からのチャネルから読み込んだらc2Countを増やす
package main

import (
        "fmt"
)

func main() {
        c1 := make(chan interface{})
        close(c1)
        c2 := make(chan interface{})
        close(c2)
        var c1Count, c2Count int
        for i := 1000; i >= 0; i-- {
                select {
                case <-c1:
                        c1Count++
                case <-c2:
                        c2Count++
                }
        }

        fmt.Printf("c1Count: %d, c2Count: %d.\n", c1Count, c2Count)
}

このコードを何回も実行した結果を見てみると、c1, c2がほぼ同じ回数ずつカウントされていることがわかる

$go run select2.go
c1Count: 499, c2Count: 502.
$go run select2.go
c1Count: 498, c2Count: 503.
$go run select2.go
c1Count: 520, c2Count: 481.
$go run select2.go
c1Count: 513, c2Count: 488.
$go run select2.go
c1Count: 505, c2Count: 496.
$go run select2.go
c1Count: 502, c2Count: 499.
$go run select2.go
c1Count: 509, c2Count: 492.
$ 

for-selectでdefault処理

参考:Go by Example: Select

以下のように、チャネルを待っている間やりたい処理をdefaultに書くことでgoroutineの結果を待っている間他の処理ができる

for {
    select {
    case <-チャネル:
        <チャネルを受け取ったあとの処理>
    case <-チャネル2:
        <チャネル2を受け取ったあとの処理>
    default:
    }
    <チャネル1とチャネル2を受け取る前に行いたい処理>
}

for で無限ループをしながらselectでdoneが来るのを待ち続ける例

goroutineが5秒後にdoneにcloseを送るので、処理が終わる

それまではWaitを繰り返す

package main

import (
    "fmt"
    "time"
)

func main() {

    done := make(chan interface{})
    go func() {
        defer close(done)
        fmt.Println("Goroutine start")
        time.Sleep(5 * time.Second)
        fmt.Println("Goroutine end. send done")
    }()

    i := 0
    for {
        select {
        case <-done:
            fmt.Println("Receive done")
            return
        default:
        }

        i++
        fmt.Printf("Wait %d\n", i)
        time.Sleep(1 * time.Second)
    }
}

これを実行すると以下のようになる

$go run sample.go
Wait 1
Goroutine start
Wait 2
Wait 3
Wait 4
Wait 5
Goroutine end. send done
Receive done

ちなみに、以下のようにdefautlとselectの終端の}の間に処理を入れても同様の結果になるが、この処理が往々にして長くなることがあってselectの終端がわかりにくくなるので、自分はあまり好んで使う人はいないようだ

 default:
        <チャネル1とチャネル2を受け取る前に行いたい処理>
    }

timeout

Go by Exampleに書いてあるtimeoutの方法

Go by Example: Timeouts に書いてある方法

time.After(タイムアウトする時間) を設定してselect文で受け取ることで、時間がかかるgoroutineのタイムアウト処理ができる

以下の例では、 - 2秒かかるgoroutineの結果c1が1秒のtimeout制限に引っかかってtimeout 1が出力され、 - 同じく2秒かかるgoroutineの結果c2が3秒のtimeout制限以内に終わるので、result 2が出力される

package main

import (
    "fmt"
    "time"
)

func main() {
    c1 := make(chan string, 1)
    go func() {
        time.Sleep(2 * time.Second)
        c1 <- "result 1"
    }()
    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(1 * time.Second):
        fmt.Println("timeout 1")
    }

    c2 := make(chan string, 1)
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "result 2"
    }()
    select {
    case res := <-c2:
        fmt.Println(res)
    case <-time.After(3 * time.Second):
        fmt.Println("timeout 2")
    }
}

実行結果

timeout 1
result 2

上のtimeout方法の問題点

上の方法には一つ問題があって、timeout 1と出てもgoroutineに切り出した処理は自動で止まらないということがある

ためしに以下のようにc1で受け取っていた部分を以下のようにtaskというgoroutineにして、task関数の中では0から999999まで数字を出力し続けるようにしてみる - また、最初と最後の時間差をとって、処理時間を計測してみる (結果は長くなるので注意)

package main

import (
    "fmt"
    "log"
    "time"
)

func main() {
    start := time.Now()
    select {
    case res := <-task():
        fmt.Println(res)
    case <-time.After(1 * time.Second):
        fmt.Println("timeout 1")
    }
    log.Printf("Total time: %fs", time.Since(start).Seconds())
}

func task() <-chan string {
    res := make(chan string)
    go func() {
        defer close(res)
        for i := 0; i < 1000000; i++ {
            fmt.Println(i)
            time.Sleep(1 * time.Nanosecond)
        }
        res <- "done!"
    }()
    return res
}

The Go Playground

実行結果

0
1
2
(省略)
475135
475136
475137
timeout 1
2019/09/29 06:11:19 Total time: 12.296678s

おかしなことが起きた! 実行結果はマシンの性能にもよると思うが、「timeout 1」まで出した後でtimeoutが出力されて、「 Total time: 12.296678s」となっている。

しかも恐ろしいことに2回目に実行すると、今度は「timeout 1」すら出ずに12秒かかっている。

489587
489588
489589
2019/09/29 06:13:37 Total time: 12.209647s

3回目もまた結果が変わる

489490
489491
489492
2019/09/29 06:14:39 Total time: 12.120974s

timeoutとなっても、すでに起動してfmt.Println(i) を出力し続けるgoroutineは勝手に暴走して、それが終わるタイミングは全然予測できていない

これをtimeout時間の通り1秒で終了させたい場合は以下のようにcontextやdoneチャネルを使うといい。

timeoutでcontext cancelを行う場合

上のコードをcontextを用いて書き直したものが以下となる。

timeout時刻になるとcontextのcancel処理が飛んで、1秒で処理が終わっていることがわかる

package main

import (
    "context"
    "fmt"
    "log"
    "time"
)

func main() {
    start := time.Now()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    res := task(ctx)
    select {
    case <-res:
        fmt.Println(res)
    case <-time.After(1 * time.Second):
        log.Println("timeout after 1s")
        cancel()
        log.Println(<-res)
    }
    log.Printf("Total time: %fs", time.Since(start).Seconds())
}

func task(ctx context.Context) <-chan string {
    res := make(chan string)
    go func() {
        defer close(res)
        for i := 0; i < 1000000; i++ {
            select {
            case <-ctx.Done():
                log.Printf("context cancelled. count: %d", i)
                res <- fmt.Sprintf("error: %s", ctx.Err())
                return
            default:
            }
            fmt.Println(i)
            time.Sleep(1 * time.Nanosecond)
        }
        res <- "done!"
    }()
    return res
}

The Go Playground

実行結果

(省略)
38503
38504
38505
38506
38507
2019/09/29 06:30:35 timeout after 1s
2019/09/29 06:30:35 context cancelled. count: 37841
2019/09/29 06:30:35 error: context canceled
2019/09/29 06:30:35 Total time: 1.000472s

contextの使い方は公式は以下。ほかにもネット上にたくさんあるので、以下で簡単な説明だけ記載しておく。 Go Concurrency Patterns: Context - The Go Blog

簡単に説明
  • コードの最初でcancel 機能付きのcontextを宣言する
  • 「defer cancel()」によって、プログラム終了時に確実にすべてのcontextがキャンセルされるようにする
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
  • task関数にこのcontextを渡して(第一引数として渡すのがセオリー)、for 文の1ループごとに、「ctx.Done()」が来ていないかチェックしている
  • このプログラムでは 「ctx.Done()」を受け取ったらlogに「context cancelled. count」とその時のループ番号を出力している
  • さらにresにエラー原因として「 ctx.Err()」を返してreturnでgoroutineを終える
  • このreturnがないとDoneを受け取ってもgoroutineが終わらないので必要
for i := 0; i < 1000000; i++ {
    select {
    case <-ctx.Done():
        log.Printf("context cancelled. count: %d", i)
        res <- fmt.Sprintf("error: %s", ctx.Err())
        return
    default:
    }
    fmt.Println(i)
    time.Sleep(1 * time.Nanosecond)
}
  • 呼び出し側のmain内では、「res := task(ctx)」としてtaskの戻り値(channel)をいったんres変数に定義
  • time.Afterの制限時間内に処理が終われば、「case <-res」に入る
  • time.Afterの制限時間を過ぎたら、「timeout after 1s」をlogに表示させて、contextのcancel関数を呼び出す(重要)
  • 「log.Println(<-res)」には、task関数内で詰めたctx.Err()の内容が出力される「error: context canceled」
    • 「<-res」とすることで、cancelをした後のtask関数が返すエラーをきちんと受け取ってから終わるように待ち合わせをしている
res := task(ctx)
select {
case <-res:
    fmt.Println(res)
case <-time.After(1 * time.Second):
    log.Println("timeout after 1s")
    cancel()
    log.Println(<-res)
}

contextを使うことで、goroutineを安全に終了させることができる

pipeline、fan-in、fan-out

go言語のpipeline、fan-in、fan-out - ludwig125のブログ

複数のgoroutineのエラーハンドリングをする

ludwig125.hatenablog.com

同時並列数を制御する

ludwig125.hatenablog.com

シグナル処理

ludwig125.hatenablog.com

GoogleAppEngineでgoroutineを使う

GoogleAppEngineでgoroutineを使った例 ludwig125.hatenablog.com