go言語で同時並列数を制御する

関連

ludwig125.hatenablog.com

同時並列数の制御

【同時並列数の制御】1. 並列数を制限しない場合

  • 並列数を制限しない場合はこの通り単純
  • 複数のgoroutineを起動する場合は、WaitGroupで待ち合わせをする
  • ※time.Sleep(1 * time.Second)は処理の様子をわかりやすくするため入れているだけで、実用では必要ない
package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    doTask()
    log.Println("finished")
}

func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    for _, num := range numbers {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }
    wg.Wait()
}

func fnA(n int) {
    log.Printf("do fnA. num: %d \n", n)
}

https://play.golang.org/p/JxdXBOThF0v

実行結果

2019/06/15 16:57:46 do fnA. num: 6 
2019/06/15 16:57:46 do fnA. num: 1 
2019/06/15 16:57:46 do fnA. num: 2 
2019/06/15 16:57:46 do fnA. num: 3 
2019/06/15 16:57:46 do fnA. num: 4 
2019/06/15 16:57:46 do fnA. num: 5 
2019/06/15 16:57:47 finished
  • 全てのgoroutineが同時に起動して、それぞれ1秒Sleepしたあとでfinishedが出力されている

【同時並列数の制御】2. 並列数を制限する場合

並列数を制限する場合 - 最大同時並列実行数をバッファサイズとしたチャネルを作り、そのチャネルの待ち合わせをすることで実現できる - semチャネルは、一旦concurrency数だけ受信したらバッファがいっぱいになるので、「<-sem」が呼ばれて解放されない限り、後続のgoroutineは起動しない => 最大同時並列実行数を制限できる

package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    doTask()
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency) // concurrency数のバッファ
    for _, num := range numbers {
        sem <- struct{}{}

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }() // 処理が終わったらチャネルを解放
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }
    wg.Wait()
}

func fnA(n int) {
    log.Printf("do fnA. num: %d \n", n)
}

https://play.golang.org/p/CEn0tw5SR-A

実行結果

2019/06/15 17:20:36 do fnA. num: 2 
2019/06/15 17:20:36 do fnA. num: 1 
2019/06/15 17:20:37 do fnA. num: 3 
2019/06/15 17:20:37 do fnA. num: 4 
2019/06/15 17:20:38 do fnA. num: 5 
2019/06/15 17:20:38 do fnA. num: 6 
2019/06/15 17:20:39 finished
  • concurrency数ずつ(ここでは2つずつ)1秒おきに実行されていることがわかる

参考

上のコードの「sem <- struct{}{}」の後ろでlen(sem)を出力してみると、一旦semチャネルのバッファがconcurrency数=2に達したら、あとは2を保ったまま後続のgoroutineが起動しているのがわかる

sem <- struct{}{}
fmt.Printf("len(sem): %d\n", len(sem)) // <- バッファ内の値を出力

実行結果

len(sem): 1
len(sem): 2
2019/06/15 20:54:28 do fnA. num: 2 
2019/06/15 20:54:28 do fnA. num: 1 
len(sem): 2
2019/06/15 20:54:29 do fnA. num: 3 
len(sem): 2
2019/06/15 20:54:29 do fnA. num: 4 
len(sem): 2
2019/06/15 20:54:30 do fnA. num: 5 
len(sem): 2
2019/06/15 20:54:30 do fnA. num: 6 
2019/06/15 20:54:31 finished

【同時並列数の制御】2-2. 並列数を制限する場合(チャネルを最後にcloseする)

上のをちょっと改良 - チャネルを使ったら最後にcloseしておいた方が安全なので、 - 以下のように全部のgoroutineを待って最後にチャネルをcloseするために、別のgoroutineを用意しておくと良い

  • goroutineが1つだけの場合は最初のgo func()内に、「defer close(チャネル)」を呼び出せばいいが、今回のように複数のgoroutineを待つ場合はこのように書くのが良さそう
func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency)
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }

    // 別のgoroutineで上の全部のgoroutineが終わるまで待つ
    // 終わったらチャネルをclose
    go func() {
        defer close(sem)
        wg.Wait()
    }()
}

https://play.golang.org/p/0MbVqYjU-B3

同時並列数の制御 その他のサンプル

同時並列数の制御として上とほとんど変わらないけど、後で書いたやつがあるのでサンプルとして載せておく 上ではwg.Wait()部分のみをgoroutineで抜き出していたが、これは関数の中身全体を一つのgoroutineで囲っている - こういう書き方もあるよって書いておきたかった - この書き方の問題は、全体をgo func()で囲っているために、関数の本体が長いと見づらくなるので自分はあまりしない

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

func worker(msg string) <-chan string {
    var wg sync.WaitGroup
    res := make(chan string)
    limit := make(chan int, 3)
    go func() {
        for i := 0; i <= 10; i++ {
            limit <- 1
            fmt.Println("len", len(limit))

            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                // 1秒かかる処理のつもり
                // 同時にlimitのバッファサイズ単位で処理していることがわかりやすいようにSleep
                time.Sleep(1 * time.Second)

                res <- fmt.Sprintf("%s done %d", msg, i)
                <-limit
            }(i)
        }
        wg.Wait()
        close(res)
    }()
    return res
}

func main() {
    res := worker("job")
    for v := range res {
        log.Println(v)
    }
}

実行結果

len 1
len 2
len 3
len 3
2019/08/08 06:47:18 job done 1
2019/08/08 06:47:18 job done 2
2019/08/08 06:47:18 job done 0
len 3
len 3
len 3
2019/08/08 06:47:19 job done 5
2019/08/08 06:47:19 job done 3
2019/08/08 06:47:19 job done 4
len 3
len 3
len 3
2019/08/08 06:47:20 job done 8
2019/08/08 06:47:20 job done 6
2019/08/08 06:47:20 job done 7
len 3
2019/08/08 06:47:21 job done 10
2019/08/08 06:47:21 job done 9

【同時並列数の制御】3. 並列数を制限してエラー処理もする場合

上のコードで、fnAがエラーを返す場合のエラー処理を入れる場合は以下になる

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency)
    errChan := make(chan error, len(numbers))
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            if err := fnA(n); err != nil {
                errChan <- fmt.Errorf("failed to A, %v", err)
                log.Printf("--> fnA len(errChan) %d", len(errChan))

                time.Sleep(1 * time.Second) // 処理をわかりやすくするため
                return
            }
            time.Sleep(1 * time.Second) // 処理をわかりやすくするため
        }(num)
    }

    go func() {
        defer close(sem)
        defer close(errChan)
        wg.Wait()
    }()

    for err := range errChan {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}
  • goroutine内で生じたエラーを外に伝えるために、errChanというチャネルを用意 errChan := make(chan error, len(numbers))
    • このチャネルのバッファ数が重要!!
  • fnAの実行時にエラーが発生した場合はerrChanに送信
  • errChanからエラーを読み取って、errを返す
for err := range errChan {
    return err
}
  • wg.Wait()のあとに close(errChan) もする

errChanのバッファ数を起動されるgoroutineの数(ここではnumbersの6)だけ用意することで、エラーが複数発生してもチャネルが詰まらないようにしているのがポイント

https://play.golang.org/p/KdQB7fLn9Na

実行結果

2019/06/16 08:46:51 do fnA.
2019/06/16 08:46:51 --> failed to do fnA. num: 2
2019/06/16 08:46:51 --> fnA len(errChan) 1
2019/06/16 08:46:51 do fnA.
2019/06/16 08:46:51 --> failed to do fnA. num: 1
2019/06/16 08:46:51 --> fnA len(errChan) 2
2019/06/16 08:46:52 do fnA.
2019/06/16 08:46:52 --> failed to do fnA. num: 3
2019/06/16 08:46:52 --> fnA len(errChan) 3
2019/06/16 08:46:52 do fnA.
2019/06/16 08:46:52 --> failed to do fnA. num: 4
2019/06/16 08:46:52 --> fnA len(errChan) 4
2019/06/16 08:46:53 do fnA.
2019/06/16 08:46:53 --> failed to do fnA. num: 5
2019/06/16 08:46:53 --> fnA len(errChan) 5
2019/06/16 08:46:53 error occured. failed to A, error A. num: 2
2019/06/16 08:46:53 finished
  • エラーが発生するたびにerrChanのバッファが埋まっていく様子がわかる
バッファ数が足りないとどうなるか?

試しに、errChanのバッファ数を0にすると、読み取り手がいないエラーを複数投げようとして詰まってdeadlockが発生する errChan := make(chan error)

https://play.golang.org/p/Zy7xu6k8U9Y

実行結果

2019/06/16 08:25:37 do fnA.
2019/06/16 08:25:37 --> failed to do fnA.
2019/06/16 08:25:37 do fnA.
2019/06/16 08:25:37 --> failed to do fnA.
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
以下省略

参考

上のコードが全部成功した場合

一応載せておくとこんな感じ

var errFlag bool = false にして実行する

https://play.golang.org/p/ZmRaPLpWC3T

実行結果

2019/06/16 08:18:39 do fnA.
2019/06/16 08:18:39 --> succeeded to do fnA. num: 2
2019/06/16 08:18:39 do fnA.
2019/06/16 08:18:39 --> succeeded to do fnA. num: 1
2019/06/16 08:18:40 do fnA.
2019/06/16 08:18:40 --> succeeded to do fnA. num: 3
2019/06/16 08:18:40 do fnA.
2019/06/16 08:18:40 --> succeeded to do fnA. num: 4
2019/06/16 08:18:41 do fnA.
2019/06/16 08:18:41 --> succeeded to do fnA. num: 5
2019/06/16 08:18:41 do fnA.
2019/06/16 08:18:41 --> succeeded to do fnA. num: 6
2019/06/16 08:18:42 finished

【同時並列数の制御】4. contextを使ってエラー制御をきちんとする

上のエラーが起きたときの挙動を見てみると、エラーが起きてもすぐに終了していないことがわかる

上のコードで、起動時のnumを出力させて見ると以下のようになる

 for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)  ← 出力

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            log.Printf("goroutine num: %d", num) ← 出力

https://play.golang.org/p/mlzfPOWDDWt

実行結果

2019/06/16 16:51:31 num: 1
2019/06/16 16:51:31 num: 2
2019/06/16 16:51:31 goroutine n: 2
2019/06/16 16:51:31 do fnA.
2019/06/16 16:51:31 --> failed to do fnA. num: 2
2019/06/16 16:51:31 --> fnA len(errChan) 1
2019/06/16 16:51:31 goroutine n: 1
2019/06/16 16:51:31 do fnA.
2019/06/16 16:51:31 --> failed to do fnA. num: 1
2019/06/16 16:51:31 --> fnA len(errChan) 2
2019/06/16 16:51:32 num: 3
2019/06/16 16:51:32 goroutine n: 3
2019/06/16 16:51:32 do fnA.
2019/06/16 16:51:32 --> failed to do fnA. num: 3
2019/06/16 16:51:32 --> fnA len(errChan) 3
2019/06/16 16:51:32 num: 4
2019/06/16 16:51:32 goroutine n: 4
2019/06/16 16:51:32 do fnA.
2019/06/16 16:51:32 --> failed to do fnA. num: 4
2019/06/16 16:51:32 --> fnA len(errChan) 4
2019/06/16 16:51:33 num: 5
2019/06/16 16:51:33 goroutine n: 5
2019/06/16 16:51:33 do fnA.
2019/06/16 16:51:33 --> failed to do fnA. num: 5
2019/06/16 16:51:33 --> fnA len(errChan) 5
2019/06/16 16:51:33 num: 6
2019/06/16 16:51:33 error occured. failed to A, error A. num: 2
2019/06/16 16:51:33 finished

これはリソースの無駄なので、エラーが起きたら即終了させるようにしたい

こういうときはcontextが便利

「【同時並列数の制御】3」のソースコードをcontextを使って以下のように書き直す

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background()) // contextとキャンセル関数を定義
    defer cancel() // doTask終了時に子プロセスを全て終了するようにしたい

    sem := make(chan struct{}, concurrency)
    errChan := make(chan error, len(numbers))
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()

            log.Printf("goroutine num: %d", n)
            select {
            case <-ctx.Done(): // contextのcancelが呼び出されたらここに入って即終了する
                return
            default:
            }
            if err := fnA(n); err != nil {
                errChan <- fmt.Errorf("failed to A, %v", err)
                log.Printf("--> fnA len(errChan) %d", len(errChan))

                // エラーが発生したら他の処理はキャンセル
                cancel()
                time.Sleep(1 * time.Second) // 処理をわかりやすくするため
                return
            }
            time.Sleep(1 * time.Second) // 処理をわかりやすくするため
        }(num)
    }

    go func() {
        defer close(sem)
        defer close(errChan)
        wg.Wait()
    }()

    for err := range errChan {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}
  • contextのcancelが呼び出されたら「<-ctx.Done()」を受け取って即終了するようにする
select {
case <-ctx.Done():
    return
default:
}
  • エラーが発生したら他の処理はキャンセルするため cancel() を送る

https://play.golang.org/p/N1mjZlo51VV

実行結果

2019/06/16 16:54:11 num: 1
2019/06/16 16:54:11 num: 2
2019/06/16 16:54:11 goroutine num: 3
2019/06/16 16:54:11 do fnA.
2019/06/16 16:54:11 --> failed to do fnA. num: 2
2019/06/16 16:54:11 --> fnA len(errChan) 1
2019/06/16 16:54:11 goroutine num: 3
2019/06/16 16:54:11 num: 3
2019/06/16 16:54:11 goroutine num: 4
2019/06/16 16:54:11 num: 4
2019/06/16 16:54:11 goroutine num: 5
2019/06/16 16:54:11 num: 5
2019/06/16 16:54:11 goroutine num: 6
2019/06/16 16:54:11 num: 6
2019/06/16 16:54:11 error occured. failed to A, error A. num: 2
2019/06/16 16:54:11 finished
  • 「do fnA. 」は一度しか呼び出されていない
  • 一つエラーが発生したら、それ以外のgoroutineは起動してもすぐに処理が終わっていることがわかる

【同時並列数の制御】5. contextに加えてerrgroupを使ってエラー制御をかんたんにする

errgroupを使うことで、エラー制御が便利になる。

以下は、syncの代わりにerrgroupを使っている

  • go get golang.org/x/sync/errgroup でerrgroupを取得
  • errChanは使わないで済むようになった
  • 失敗した時の他の処理の取り消しはcancelを書かなくても勝手にやってくれる
package main

import (
    "context"
    "fmt"
    "log"

    "golang.org/x/sync/errgroup"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    eg, ctx := errgroup.WithContext(context.Background())

    sem := make(chan struct{}, concurrency)
    defer close(sem)
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)

        n := num
        eg.Go(func() error {
            defer func() { <-sem }()
            log.Printf("goroutine num: %d", n)
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }
            if err := fnA(n); err != nil {
                return fmt.Errorf("failed to A, %v", err)
            }
            return nil
        })
    }

    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}

https://play.golang.org/p/ycVHbLn45b6

実行結果

2019/06/16 17:17:31 num: 1
2019/06/16 17:17:31 num: 2
2019/06/16 17:17:31 goroutine num: 2
2019/06/16 17:17:31 do fnA.
2019/06/16 17:17:31 --> failed to do fnA. num: 2
2019/06/16 17:17:31 goroutine num: 1
2019/06/16 17:17:31 num: 3
2019/06/16 17:17:31 goroutine num: 3
2019/06/16 17:17:31 num: 4
2019/06/16 17:17:31 goroutine num: 4
2019/06/16 17:17:31 num: 5
2019/06/16 17:17:31 goroutine num: 5
2019/06/16 17:17:31 num: 6
2019/06/16 17:17:31 goroutine num: 6
2019/06/16 17:17:32 error occured. failed to A, error A. num: 2
2019/06/16 17:17:32 finished

参考: