ludwig125のブログ

頑張りすぎずに頑張る父

go言語のpipeline、fan-in、fan-out

関連

ludwig125.hatenablog.com

pipeline(パイプライン)

Go言語による並行処理を読んでパイプラインやファンイン、ファンアウトについて自分なりに理解したので具体例とともに挙動を書く

O'Reilly Japan - Go言語による並行処理

pipelineや、fan-in(ファンイン)、fan-out(ファンアウト)についてその挙動を見るために、同じ題材に異なる3つ方法で対応してみる

題材

- 6つの作業(タスク)を処理する場合を考える
- 各作業は互いに依存がなく並行処理が可能
- タスク番号とそれに応じた必要作業時間が与えられる

対応方法

  1. goroutineを一切使わない方法で処理する
  2. pipelineを使う
  3. fan-inとfan-outを使った並行処理をする

以下ではTaskという構造体にTask番号とTaskにかかるコスト(作業時間)を定義して、複数Taskを処理するのにどれだけ時間がかかるか、どのように処理されていくかを見てみる

Task構造体とtaskList

type Task struct {
    Number int
    Cost   time.Duration
}
taskList = []Task{
    Task{1, 1 * time.Second},
    Task{2, 7 * time.Second},
    Task{3, 2 * time.Second},
    Task{4, 3 * time.Second},
    Task{5, 5 * time.Second},
    Task{6, 3 * time.Second},
}

■1. goroutineを一切使わない場合

まずはgoroutineを一切使わない場合から考える - 書くのも処理の結果も一番簡単で分かりやすい - 各TaskのListを順番に受け取って、TaskのCost分Sleepしている

package main

import (
    "log"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskを処理して処理済みのTask番号をSliceとして返す関数
func doTask(tasks []Task) []int {
    var doneTaskList []int
    for _, task := range tasks {
        log.Printf("do task number: %d\n", task.Number)
        // taskのための処理をする
        // ここではtask にかかるCostだけSleepする
        time.Sleep(task.Cost)
        doneTaskList = append(doneTaskList, task.Number) // 処理済みtask番号をlistにつめる
    }
    return doneTaskList
}

func main() {
    start := time.Now()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }

    count := 0
    for _, d := range doTask(taskList) { // 処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }
    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

これを実行すると以下の結果になる

$go run goroutine/fanin/fanin3/nogoroutine/main.go 
2019/09/30 05:03:25 do task number: 1
2019/09/30 05:03:26 do task number: 2
2019/09/30 05:03:33 do task number: 3
2019/09/30 05:03:35 do task number: 4
2019/09/30 05:03:38 do task number: 5
2019/09/30 05:03:43 do task number: 6
2019/09/30 05:03:46 done task number: 1
2019/09/30 05:03:46 done task number: 2
2019/09/30 05:03:46 done task number: 3
2019/09/30 05:03:46 done task number: 4
2019/09/30 05:03:46 done task number: 5
2019/09/30 05:03:46 done task number: 6
2019/09/30 05:03:46 Finished. Done 6 tasks. Total time: 21.004066s
  • すべてのTaskが終わってから、最後に処理されたTask番号を出力している様子がわかる
  • 処理時間の合計は、Costの合計とほぼ同じ

■2. pipelineを使う

上のコードをchannelを使った方法で書き直してみる。

pipelineにするだけではうまみが感じられないが、 これは、後に書くfan-inとfan-outのために重要な理解になる。

package main

import (
    "context"
    "log"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskをchannel化するgenerator
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
    taskCh := make(chan Task)

    go func() {
        defer close(taskCh)
        for _, task := range taskList {
            select {
            case <-ctx.Done():
                return
            case taskCh <- task: // taskをchannelにつめる
            }
        }
    }()
    return taskCh
}

// taskを処理して処理済みのTask番号をchannelとして返す関数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
    doneTaskCh := make(chan int)
    go func() {
        defer close(doneTaskCh)
        for task := range taskCh {
            select {
            case <-ctx.Done():
                return
            default:
            }
            log.Printf("do task number: %d\n", task.Number)
            // taskのための処理をする
            // ここではtask にかかるCostだけSleepする
            time.Sleep(task.Cost)
            doneTaskCh <- task.Number // 処理済みtask番号をchannelにつめる
        }
    }()
    return doneTaskCh
}

func main() {
    start := time.Now()
    // 処理の途中で中断されてもgoroutineリークしないようにcontextを使う
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }

    // taskChannelGerenatorとdoTaskという2つのステージをまとめたpipelineを定義する
    pipeline := doTask(ctx, taskChannelGerenator(ctx, taskList))
    count := 0
    for d := range pipeline { // pipelineから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }

    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

以下内容の説明

taskChannelGerenator
  • taskChannelGerenatorという関数を最初に呼び出して、これにtaskListを渡している
  • taskChannelGerenatorは、taskListを受け取って、これをTask型のchannelにする関数
  • pipelineの最初に変数をchannelに変換する処理が多く使われいて、一般にgenとかgeneratorなどと呼ばれることが多い
doTask
  • doTaskは、generatorの作ったTask channelを「for task := range taskCh」でそれぞれ処理する
  • goroutineの外部から終了を受け取って止まるようにするために、以下のselect文を挟んでいる
select {
case <-ctx.Done():
    return
default:
}
  • 最後に、「doneTaskCh <- task.Number」の部分で処理済みtask番号をchannelにつめる
main
  • 処理の途中で中断されてもgoroutineリークしないようにcontextを定義
  • 処理の最後に必ずすべて終わるようにdefer cancelをつける
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
  • taskChannelGerenatorとdoTaskという2つの関数をまとめたpipelineを定義する
  • generatorもdoTaskもchannelを受け取ってchannelを返す関数
  • このように入力と出力の型を同じにして、自由に組み合わせ可能な処理をそれぞれステージと呼び、ステージのまとまりをpipelineと呼ぶ
pipeline := doTask(ctx, taskChannelGerenator(ctx, taskList))
  • pipelineから処理済みtaskの番号を読み出し
for d := range pipeline {
略
}

実行結果

$go run goroutine/fanin/fanin3/before/main.go     
2019/09/30 05:15:52 do task number: 1
2019/09/30 05:15:53 do task number: 2
2019/09/30 05:15:53 done task number: 1
2019/09/30 05:16:00 do task number: 3
2019/09/30 05:16:00 done task number: 2
2019/09/30 05:16:02 do task number: 4
2019/09/30 05:16:02 done task number: 3
2019/09/30 05:16:05 do task number: 5
2019/09/30 05:16:05 done task number: 4
2019/09/30 05:16:10 do task number: 6
2019/09/30 05:16:10 done task number: 5
2019/09/30 05:16:13 done task number: 6
2019/09/30 05:16:13 Finished. Done 6 tasks. Total time: 21.006284s
  • doTaskの処理がすべて終わってからdoneが出力されているわけではないことがわかる
  • ただ、この時点ではpipeline化する前と処理の合計時間が変わらないのでうれしいことがあまりない

■3. fan-inとfan-outを使った並行処理をする

上のpipelineの速度を改善するために、doTaskの処理を並行で複数いっぺんに処理することを考える

1本のpipelineの流れを複数に分ける処理(同時に複数のgoroutineを起動する処理)をfan-out(ファンアウト)と呼び、 複数のgoroutineで処理されたchannelを1本に集約する処理をfan-in(ファンイン)と呼ぶ

package main

import (
    "context"
    "log"
    "sync"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskをchannel化するgenerator
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
    taskCh := make(chan Task)

    go func() {
        defer close(taskCh)
        for _, task := range taskList {
            select {
            case <-ctx.Done():
                return
            case taskCh <- task:
            }
        }
    }()
    return taskCh
}

// taskを処理して処理済みのTask番号をchannelとして返す関数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
    doneTaskCh := make(chan int)
    go func() {
        defer close(doneTaskCh)
        for task := range taskCh {
            select {
            case <-ctx.Done():
                return
            default:
                log.Printf("do task number: %d\n", task.Number)
                // taskのための処理をする
                // ここではtask にかかるCostだけSleepする
                time.Sleep(task.Cost)
                doneTaskCh <- task.Number // 処理済みtask番号をchannelにつめる
            }
        }
    }()
    return doneTaskCh
}

func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
    var wg sync.WaitGroup
    mergedTaskCh := make(chan int)

    mergeTask := func(taskCh <-chan int) {
        defer wg.Done()
        for t := range taskCh {
            select {
            case <-ctx.Done():
                return
            case mergedTaskCh <- t:
            }
        }
    }

    wg.Add(len(taskChs))
    for _, taskCh := range taskChs {
        go mergeTask(taskCh)
    }
    // 全てのtaskが処理されるまで待つ
    go func() {
        wg.Wait()
        close(mergedTaskCh)
    }()
    return mergedTaskCh
}

func main() {
    start := time.Now()
    // 処理の途中で中断されてもgoroutineリークしないようにcontextを使う(done channelでもいい)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }
    taskCh := taskChannelGerenator(ctx, taskList)

    numWorkers := 4
    workers := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = doTask(ctx, taskCh)
    }

    count := 0
    for d := range merge(ctx, workers) { // mergeから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }
    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

以下内容の説明

main
  • 一旦taskChannelGerenatorで作ったchannelを受け取り、doTask(ctx, taskCh)を複数for文で実行する
  • 実行結果(intのsliceのchannel)はworkersに格納する
  • doTaskを複数のworkersに処理を分けているので、これがfan-out処理にあたる
taskCh := taskChannelGerenator(ctx, taskList)
numWorkers := 4
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
    workers[i] = doTask(ctx, taskCh)
}
merge
  • merge関数は引数としてworkersが渡した複数のchannel「[]<-chan int」を受け取って、単一のchannel「<-chan int」を返す関数
  • 複数channelを一つにmergeしているので、これがfan-inにあたる
func merge(ctx context.Context, taskChs []<-chan int) <-chan int 
  • 以下の部分では、taskChsの数だけchannelを受け取ってそれぞれのchannelをgoroutineとして起動したmergeTaskに渡す
    • ここでmergeTaskに渡しているchannelは複数のうち1つだけだということに注意
    • 複数channelを複数goroutineでそれぞれ処理するということ
  • 最後に複数goroutineの待ち合わせをする必要があるので、事前にtaskChsの数だけwg.Addしている
wg.Add(len(taskChs))
for _, taskCh := range taskChs {
    go mergeTask(taskCh)
}
  • mergeTaskでは処理が終わったら「defer wg.Done()」でwgの数を減らすようにする
  • for select文の中で、ctx.Done()を受け取る部分は処理の中断用で前述と同じ
  • 事前にmergedTaskChという単一のchannelを定義する
  • goroutineでそれぞれ処理されるmergeTask関数の中で、受け取ったchannelは、このmergedTaskChに詰められる
    • ここで複数のchannelが1つになっている
 mergedTaskCh := make(chan int)
    mergeTask := func(taskCh <-chan int) {
        defer wg.Done()
        for t := range taskCh {
            select {
            case <-ctx.Done():
                return
            case mergedTaskCh <- t:
            }
        }
    }
  • 最後に、wgを待ち合わせて、mergedTaskChをcloseして、mergedTaskChを返す
 // 全てのtaskが処理されるまで待つ
    go func() {
        wg.Wait()
        close(mergedTaskCh)
    }()
    return mergedTaskCh
}
再びmain関数
  • merge関数が返すchannelを受け取って、Task番号を出力する
 for d := range merge(ctx, workers) { // mergeから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }

実行結果

$go run goroutine/fanin/fanin3/after/main.go 
2019/09/30 06:19:14 do task number: 1
2019/09/30 06:19:14 do task number: 4
2019/09/30 06:19:14 do task number: 2
2019/09/30 06:19:14 do task number: 3
2019/09/30 06:19:15 do task number: 5
2019/09/30 06:19:15 done task number: 1
2019/09/30 06:19:16 do task number: 6
2019/09/30 06:19:16 done task number: 3
2019/09/30 06:19:17 done task number: 4
2019/09/30 06:19:19 done task number: 6
2019/09/30 06:19:20 done task number: 5
2019/09/30 06:19:21 done task number: 2
2019/09/30 06:19:21 Finished. Done 6 tasks. Total time: 7.001934s
  • numWorkersを4にして実行すると上の結果が得られた
  • 同時並行で4つのpipelineが起動するので、これだけ早くなっている

numWorkersをいくつにするかは、使っているマシンのコア数などによって決まってくる