ludwig125のブログ

頑張りすぎずに頑張る父

go言語のpipeline、fan-in、fan-out

関連

ludwig125.hatenablog.com

pipeline(パイプライン)

Go言語による並行処理を読んでパイプラインやファンイン、ファンアウトについて自分なりに理解したので具体例とともに挙動を書く

O'Reilly Japan - Go言語による並行処理

pipelineや、fan-in(ファンイン)、fan-out(ファンアウト)についてその挙動を見るために、同じ題材に異なる3つ方法で対応してみる

題材

- 6つの作業(タスク)を処理する場合を考える
- 各作業は互いに依存がなく並行処理が可能
- タスク番号とそれに応じた必要作業時間が与えられる

対応方法

  1. goroutineを一切使わない方法で処理する
  2. pipelineを使う
  3. fan-inとfan-outを使った並行処理をする

以下ではTaskという構造体にTask番号とTaskにかかるコスト(作業時間)を定義して、複数Taskを処理するのにどれだけ時間がかかるか、どのように処理されていくかを見てみる

Task構造体とtaskList

type Task struct {
    Number int
    Cost   time.Duration
}
taskList = []Task{
    Task{1, 1 * time.Second},
    Task{2, 7 * time.Second},
    Task{3, 2 * time.Second},
    Task{4, 3 * time.Second},
    Task{5, 5 * time.Second},
    Task{6, 3 * time.Second},
}

■1. goroutineを一切使わない場合

まずはgoroutineを一切使わない場合から考える - 書くのも処理の結果も一番簡単で分かりやすい - 各TaskのListを順番に受け取って、TaskのCost分Sleepしている

package main

import (
    "log"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskを処理して処理済みのTask番号をSliceとして返す関数
func doTask(tasks []Task) []int {
    var doneTaskList []int
    for _, task := range tasks {
        log.Printf("do task number: %d\n", task.Number)
        // taskのための処理をする
        // ここではtask にかかるCostだけSleepする
        time.Sleep(task.Cost)
        doneTaskList = append(doneTaskList, task.Number) // 処理済みtask番号をlistにつめる
    }
    return doneTaskList
}

func main() {
    start := time.Now()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }

    count := 0
    for _, d := range doTask(taskList) { // 処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }
    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

これを実行すると以下の結果になる

$go run goroutine/fanin/fanin3/nogoroutine/main.go 
2019/09/30 05:03:25 do task number: 1
2019/09/30 05:03:26 do task number: 2
2019/09/30 05:03:33 do task number: 3
2019/09/30 05:03:35 do task number: 4
2019/09/30 05:03:38 do task number: 5
2019/09/30 05:03:43 do task number: 6
2019/09/30 05:03:46 done task number: 1
2019/09/30 05:03:46 done task number: 2
2019/09/30 05:03:46 done task number: 3
2019/09/30 05:03:46 done task number: 4
2019/09/30 05:03:46 done task number: 5
2019/09/30 05:03:46 done task number: 6
2019/09/30 05:03:46 Finished. Done 6 tasks. Total time: 21.004066s
  • すべてのTaskが終わってから、最後に処理されたTask番号を出力している様子がわかる
  • 処理時間の合計は、Costの合計とほぼ同じ

■2. pipelineを使う

上のコードをchannelを使った方法で書き直してみる。

pipelineにするだけではうまみが感じられないが、 これは、後に書くfan-inとfan-outのために重要な理解になる。

package main

import (
    "context"
    "log"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskをchannel化するgenerator
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
    taskCh := make(chan Task)

    go func() {
        defer close(taskCh)
        for _, task := range taskList {
            select {
            case <-ctx.Done():
                return
            case taskCh <- task: // taskをchannelにつめる
            }
        }
    }()
    return taskCh
}

// taskを処理して処理済みのTask番号をchannelとして返す関数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
    doneTaskCh := make(chan int)
    go func() {
        defer close(doneTaskCh)
        for task := range taskCh {
            select {
            case <-ctx.Done():
                return
            default:
            }
            log.Printf("do task number: %d\n", task.Number)
            // taskのための処理をする
            // ここではtask にかかるCostだけSleepする
            time.Sleep(task.Cost)
            doneTaskCh <- task.Number // 処理済みtask番号をchannelにつめる
        }
    }()
    return doneTaskCh
}

func main() {
    start := time.Now()
    // 処理の途中で中断されてもgoroutineリークしないようにcontextを使う
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }

    // taskChannelGerenatorとdoTaskという2つのステージをまとめたpipelineを定義する
    pipeline := doTask(ctx, taskChannelGerenator(ctx, taskList))
    count := 0
    for d := range pipeline { // pipelineから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }

    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

以下内容の説明

taskChannelGerenator
  • taskChannelGerenatorという関数を最初に呼び出して、これにtaskListを渡している
  • taskChannelGerenatorは、taskListを受け取って、これをTask型のchannelにする関数
  • pipelineの最初に変数をchannelに変換する処理が多く使われいて、一般にgenとかgeneratorなどと呼ばれることが多い
doTask
  • doTaskは、generatorの作ったTask channelを「for task := range taskCh」でそれぞれ処理する
  • goroutineの外部から終了を受け取って止まるようにするために、以下のselect文を挟んでいる
select {
case <-ctx.Done():
    return
default:
}
  • 最後に、「doneTaskCh <- task.Number」の部分で処理済みtask番号をchannelにつめる
main
  • 処理の途中で中断されてもgoroutineリークしないようにcontextを定義
  • 処理の最後に必ずすべて終わるようにdefer cancelをつける
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
  • taskChannelGerenatorとdoTaskという2つの関数をまとめたpipelineを定義する
  • generatorもdoTaskもchannelを受け取ってchannelを返す関数
  • このように入力と出力の型を同じにして、自由に組み合わせ可能な処理をそれぞれステージと呼び、ステージのまとまりをpipelineと呼ぶ
pipeline := doTask(ctx, taskChannelGerenator(ctx, taskList))
  • pipelineから処理済みtaskの番号を読み出し
for d := range pipeline {
略
}

実行結果

$go run goroutine/fanin/fanin3/before/main.go     
2019/09/30 05:15:52 do task number: 1
2019/09/30 05:15:53 do task number: 2
2019/09/30 05:15:53 done task number: 1
2019/09/30 05:16:00 do task number: 3
2019/09/30 05:16:00 done task number: 2
2019/09/30 05:16:02 do task number: 4
2019/09/30 05:16:02 done task number: 3
2019/09/30 05:16:05 do task number: 5
2019/09/30 05:16:05 done task number: 4
2019/09/30 05:16:10 do task number: 6
2019/09/30 05:16:10 done task number: 5
2019/09/30 05:16:13 done task number: 6
2019/09/30 05:16:13 Finished. Done 6 tasks. Total time: 21.006284s
  • doTaskの処理がすべて終わってからdoneが出力されているわけではないことがわかる
  • ただ、この時点ではpipeline化する前と処理の合計時間が変わらないのでうれしいことがあまりない

■3. fan-inとfan-outを使った並行処理をする

上のpipelineの速度を改善するために、doTaskの処理を並行で複数いっぺんに処理することを考える

1本のpipelineの流れを複数に分ける処理(同時に複数のgoroutineを起動する処理)をfan-out(ファンアウト)と呼び、 複数のgoroutineで処理されたchannelを1本に集約する処理をfan-in(ファンイン)と呼ぶ

package main

import (
    "context"
    "log"
    "sync"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskをchannel化するgenerator
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
    taskCh := make(chan Task)

    go func() {
        defer close(taskCh)
        for _, task := range taskList {
            select {
            case <-ctx.Done():
                return
            case taskCh <- task:
            }
        }
    }()
    return taskCh
}

// taskを処理して処理済みのTask番号をchannelとして返す関数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
    doneTaskCh := make(chan int)
    go func() {
        defer close(doneTaskCh)
        for task := range taskCh {
            select {
            case <-ctx.Done():
                return
            default:
                log.Printf("do task number: %d\n", task.Number)
                // taskのための処理をする
                // ここではtask にかかるCostだけSleepする
                time.Sleep(task.Cost)
                doneTaskCh <- task.Number // 処理済みtask番号をchannelにつめる
            }
        }
    }()
    return doneTaskCh
}

func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
    var wg sync.WaitGroup
    mergedTaskCh := make(chan int)

    mergeTask := func(taskCh <-chan int) {
        defer wg.Done()
        for t := range taskCh {
            select {
            case <-ctx.Done():
                return
            case mergedTaskCh <- t:
            }
        }
    }

    wg.Add(len(taskChs))
    for _, taskCh := range taskChs {
        go mergeTask(taskCh)
    }
    // 全てのtaskが処理されるまで待つ
    go func() {
        wg.Wait()
        close(mergedTaskCh)
    }()
    return mergedTaskCh
}

func main() {
    start := time.Now()
    // 処理の途中で中断されてもgoroutineリークしないようにcontextを使う(done channelでもいい)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }
    taskCh := taskChannelGerenator(ctx, taskList)

    numWorkers := 4
    workers := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = doTask(ctx, taskCh)
    }

    count := 0
    for d := range merge(ctx, workers) { // mergeから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }
    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

以下内容の説明

main
  • 一旦taskChannelGerenatorで作ったchannelを受け取り、doTask(ctx, taskCh)を複数for文で実行する
  • 実行結果(intのsliceのchannel)はworkersに格納する
  • doTaskを複数のworkersに処理を分けているので、これがfan-out処理にあたる
taskCh := taskChannelGerenator(ctx, taskList)
numWorkers := 4
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
    workers[i] = doTask(ctx, taskCh)
}
merge
  • merge関数は引数としてworkersが渡した複数のchannel「[]<-chan int」を受け取って、単一のchannel「<-chan int」を返す関数
  • 複数channelを一つにmergeしているので、これがfan-inにあたる
func merge(ctx context.Context, taskChs []<-chan int) <-chan int 
  • 以下の部分では、taskChsの数だけchannelを受け取ってそれぞれのchannelをgoroutineとして起動したmergeTaskに渡す
    • ここでmergeTaskに渡しているchannelは複数のうち1つだけだということに注意
    • 複数channelを複数goroutineでそれぞれ処理するということ
  • 最後に複数goroutineの待ち合わせをする必要があるので、事前にtaskChsの数だけwg.Addしている
wg.Add(len(taskChs))
for _, taskCh := range taskChs {
    go mergeTask(taskCh)
}
  • mergeTaskでは処理が終わったら「defer wg.Done()」でwgの数を減らすようにする
  • for select文の中で、ctx.Done()を受け取る部分は処理の中断用で前述と同じ
  • 事前にmergedTaskChという単一のchannelを定義する
  • goroutineでそれぞれ処理されるmergeTask関数の中で、受け取ったchannelは、このmergedTaskChに詰められる
    • ここで複数のchannelが1つになっている
 mergedTaskCh := make(chan int)
    mergeTask := func(taskCh <-chan int) {
        defer wg.Done()
        for t := range taskCh {
            select {
            case <-ctx.Done():
                return
            case mergedTaskCh <- t:
            }
        }
    }
  • 最後に、wgを待ち合わせて、mergedTaskChをcloseして、mergedTaskChを返す
 // 全てのtaskが処理されるまで待つ
    go func() {
        wg.Wait()
        close(mergedTaskCh)
    }()
    return mergedTaskCh
}
再びmain関数
  • merge関数が返すchannelを受け取って、Task番号を出力する
 for d := range merge(ctx, workers) { // mergeから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }

実行結果

$go run goroutine/fanin/fanin3/after/main.go 
2019/09/30 06:19:14 do task number: 1
2019/09/30 06:19:14 do task number: 4
2019/09/30 06:19:14 do task number: 2
2019/09/30 06:19:14 do task number: 3
2019/09/30 06:19:15 do task number: 5
2019/09/30 06:19:15 done task number: 1
2019/09/30 06:19:16 do task number: 6
2019/09/30 06:19:16 done task number: 3
2019/09/30 06:19:17 done task number: 4
2019/09/30 06:19:19 done task number: 6
2019/09/30 06:19:20 done task number: 5
2019/09/30 06:19:21 done task number: 2
2019/09/30 06:19:21 Finished. Done 6 tasks. Total time: 7.001934s
  • numWorkersを4にして実行すると上の結果が得られた
  • 同時並行で4つのpipelineが起動するので、これだけ早くなっている

numWorkersをいくつにするかは、使っているマシンのコア数などによって決まってくる

go言語で複数のgoroutineのエラーハンドリングをする

関連

ludwig125.hatenablog.com

複数のgoroutineの結果の取得

複数のgoroutineの結果の取得1(エラーが起きると中断する例)

第5章 並行プログラミング―ゴルーチンとチャネルを使いこなす:はじめてのGo―シンプルな言語仕様,型システム,並行処理|gihyo.jp … 技術評論社

こちらのコードを参考に以下のような複数のgoroutineを実行してその結果を取得する場合を考える

package main

import (
        "fmt"
        "log"
        "net/http"
)

func getStatus(urls []string) <-chan string {
        statusChan := make(chan string)
        for _, url := range urls {
                go func(url string) {
                        res, err := http.Get(url)
                        if err != nil {
                                log.Fatal(err)
                        }
                        statusChan <- res.Status
                }(url)
        }
        return statusChan
}

func main() {
        urls := []string{"https://www.google.com", "https://www.yahoo.co.jp/"}
        statusChan := getStatus(urls)

        for i := 0; i < len(urls); i++ {
                fmt.Println(<-statusChan)
        }
}

これを実行すると以下のような結果になる

$go run goroutine0.go 
200 OK
200 OK

しかし、このコードの場合、どれか一つでもhttp.Getに失敗するとそこで処理を中断してしまうようになっている

試しに、上のurlsを以下のように書き換えてみる

urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}

実行するとhttps://badhostの時点で終了する

$go run goroutine0.go
2019/05/12 07:14:01 Get https://badhost: dial tcp: lookup badhost: no such host
exit status 1

もしhttp.Getに失敗した場合に、そのままstatusChan <- res.Statusをしようとすると、 以下のようなnil pointer参照のpanicが起きてしまうので、このコードではhttp.Getが失敗した時点でlog.Fatalによってエラーメッセージとともに処理を中断することになっている

http.Get error: Get https://badhost: dial tcp: lookup badhost: no such host
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0x5ec38e]

エラーが起きても処理を中断してほしくない場合は以下のように書ける

複数のgoroutineの結果の取得2(エラー処理ができない例)

上のコードを元に、エラーが起きてもその他の処理を進めるようにしたのが以下になる

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    checkStatus := func(urls []string) <-chan *http.Response {
        resultChan := make(chan *http.Response)
        for _, url := range urls {
            go func(url string) {
                resp, err := http.Get(url)
                // http.Getに時間がかかった場合を模するためにsleep                                    
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                if err != nil {
                    fmt.Printf("http.Get error: %v\n", err)
                }
                resultChan <- resp
            }(url)
        }
        return resultChan
    }
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    resultChan := checkStatus(urls)

    for i := 0; i < len(urls); i++ {
        result := <-resultChan
        if result == nil {
            fmt.Printf("Response: nil\n")
            continue
        }
        fmt.Printf("Response: %s\n", result.Status)
    }
}
  • http.Getでerrorが生じてもnil pointerのpanicが起きないように、「resultChan <- resp」のように結果をまるごと入れている。Statusは関数の呼び出し側で見ることにする
  • http.Getで失敗すると当然respはnilなので、channelの呼び出しの際に、「if result == nil」という条件分岐をしている

これを実行すると以下のようになる

$go run goroutine2.go
sleep 2 sec
sleep 2 sec
sleep 2 sec
http.Get error: Get https://badhost: dial tcp: lookup badhost: no such host
Response: nil
Response: 200 OK
Response: 200 OK

これはなかなか良さそうだが、関数の呼び出し側でエラーハンドリングができないという問題がある。

ここでは、nilが返ってきたことしかわからない

できれば「Get https://badhost: dial tcp: lookup badhost: no such host」を関数の呼び出し側で出力できるようにしたい

複数のgoroutineの結果の取得3(呼び出し側でエラー処理ができるようにした例)

Go言語による並行処理 などを読むと以下のようにErrorとResponseをひとまとめにした構造体を返せばいいとある concurrency-in-go-src/fig-patterns-proper-err-handling.go at 4e55fd7f3f5b9c5efc45a841702393a1485ba206 · kat-co/concurrency-in-go-src · GitHub

これを参考に上のコードを以下のように直してみる

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(urls []string) <-chan Result {
        resultChan := make(chan Result)
        for _, url := range urls {
            go func(url string) {
                resp, err := http.Get(url)
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                resultChan <- Result{Error: err, Response: resp}                                                                   
            }(url)
        }   
        return resultChan
    }   
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    result := checkStatus(urls)
    
    for i := 0; i < len(urls); i++ {
        res := <-result
        if res.Error != nil {
            fmt.Printf("error: %v\n", res.Error)
            continue
        }
        fmt.Printf("Response: %v\n", res.Response.Status)
    }
}   

res.Errorを関数の呼び出し側で出力できた。 これができればエラーの時は~などの処理がやりやすくなりそう

複数のgoroutineの結果の取得4(呼び出し側でエラー処理ができるようにした例 ※for range使用)

上のコードでも問題ないと思うが、勉強のために別の書き方も書いてみたい

ここまではchannelの読み込みを以下のようにしていたが、

result := checkStatus(urls)                                                                                                  
for i := 0; i < len(urls); i++ {                                  
    res := <-result                                               
    ~
}

channelは以下のような受け取り方もできる

for result := range checkStatus(urls) {
  ~
}

ただ、for rangeの書き方をするには注意が必要になる

  • for rangeでchannelを受け取る場合、goroutine側でchennelのcloseが必要になる
    • closeしないと終わりがわからずmain側のfor range文が延々と待ち続けることになる
  • では単純にgoroutineの中で以下のように「defer close(results) 」してもいいのかというとそうではない
// うまくいかない例
func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(urls []string) <-chan Result {
        resultChan := make(chan Result, 10)                                                     
        
        for _, url := range urls {
            go func(url string) {
                defer close(resultChan) // ここでcloseすると複数goroutine全部を待てない
                resp, err := http.Get(url)
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                resultChan <- Result{Error: err, Response: resp}
            }(url)
        }   
        return resultChan
    }   
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    
    for result := range checkStatus(urls) {
        if result.Error != nil {
            fmt.Printf("error: %v\n", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}   

これを実行すると以下のようになる

sleep 2 sec
sleep 2 sec
sleep 2 sec
error: Get https://badhost: dial tcp: lookup badhost: no such host
  • goroutineが一つしか実行されていない

  • 複数のgoroutineの処理を待ってからclose(resultChan)をするために、sync.WaitGroupを使ってみた

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(urls []string) <-chan Result {
        resultChan := make(chan Result, 10)
        wg := new(sync.WaitGroup)

        defer close(resultChan)
        for _, url := range urls {
            wg.Add(1)
            go func(url string) {
                defer wg.Done()
                resp, err := http.Get(url)                                                                               
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                resultChan <- Result{Error: err, Response: resp}
            }(url)
        }   
        wg.Wait()
        return resultChan
    }   
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    
    for result := range checkStatus(urls) {
        if result.Error != nil {
            fmt.Printf("error: %v\n", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}   

実行結果は以下のようになる

sleep 2 sec
sleep 2 sec
sleep 2 sec
error: Get https://badhost: dial tcp: lookup badhost: no such host
Response: 200 OK
Response: 200 OK

やりたいことが実現できた

複数のgoroutineの結果の取得5(呼び出し側でエラー処理ができるようにした例 ※for range使用 goroutineリークを避けるように工夫したもの)

上のコードは、selectを使って外部からgoroutineを中断するように変えると、goroutineリークによってメモリ使用率を圧迫することを避けることができる

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(done <-chan interface{}, urls []string) <-chan Result {
        resultChan := make(chan Result, 10)
        wg := new(sync.WaitGroup)

        defer close(resultChan)
        for _, url := range urls {
            wg.Add(1)
            go func(url string) {
                defer wg.Done()
                resp, err := http.Get(url)
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                select {
                case <-done:
                    return
                case resultChan <- Result{Error: err, Response: resp}:
                }
            }(url)
        }
        wg.Wait()
        return resultChan
    }

    done := make(chan interface{})
    defer close(done)
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}

    for result := range checkStatus(done, urls) {
        if result.Error != nil {
            fmt.Printf("error: %v\n", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}

実行結果はこうなる

sleep 2 sec
sleep 2 sec
sleep 2 sec
error: Get https://badhost: dial tcp: lookup badhost: no such host
Response: 200 OK
Response: 200 OK

go言語で同時並列数を制御する

関連

ludwig125.hatenablog.com

同時並列数の制御

【同時並列数の制御】1. 並列数を制限しない場合

  • 並列数を制限しない場合はこの通り単純
  • 複数のgoroutineを起動する場合は、WaitGroupで待ち合わせをする
  • ※time.Sleep(1 * time.Second)は処理の様子をわかりやすくするため入れているだけで、実用では必要ない
package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    doTask()
    log.Println("finished")
}

func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    for _, num := range numbers {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }
    wg.Wait()
}

func fnA(n int) {
    log.Printf("do fnA. num: %d \n", n)
}

https://play.golang.org/p/JxdXBOThF0v

実行結果

2019/06/15 16:57:46 do fnA. num: 6 
2019/06/15 16:57:46 do fnA. num: 1 
2019/06/15 16:57:46 do fnA. num: 2 
2019/06/15 16:57:46 do fnA. num: 3 
2019/06/15 16:57:46 do fnA. num: 4 
2019/06/15 16:57:46 do fnA. num: 5 
2019/06/15 16:57:47 finished
  • 全てのgoroutineが同時に起動して、それぞれ1秒Sleepしたあとでfinishedが出力されている

【同時並列数の制御】2. 並列数を制限する場合

並列数を制限する場合 - 最大同時並列実行数をバッファサイズとしたチャネルを作り、そのチャネルの待ち合わせをすることで実現できる - semチャネルは、一旦concurrency数だけ受信したらバッファがいっぱいになるので、「<-sem」が呼ばれて解放されない限り、後続のgoroutineは起動しない => 最大同時並列実行数を制限できる

package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    doTask()
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency) // concurrency数のバッファ
    for _, num := range numbers {
        sem <- struct{}{}

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }() // 処理が終わったらチャネルを解放
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }
    wg.Wait()
}

func fnA(n int) {
    log.Printf("do fnA. num: %d \n", n)
}

https://play.golang.org/p/CEn0tw5SR-A

実行結果

2019/06/15 17:20:36 do fnA. num: 2 
2019/06/15 17:20:36 do fnA. num: 1 
2019/06/15 17:20:37 do fnA. num: 3 
2019/06/15 17:20:37 do fnA. num: 4 
2019/06/15 17:20:38 do fnA. num: 5 
2019/06/15 17:20:38 do fnA. num: 6 
2019/06/15 17:20:39 finished
  • concurrency数ずつ(ここでは2つずつ)1秒おきに実行されていることがわかる

参考

上のコードの「sem <- struct{}{}」の後ろでlen(sem)を出力してみると、一旦semチャネルのバッファがconcurrency数=2に達したら、あとは2を保ったまま後続のgoroutineが起動しているのがわかる

sem <- struct{}{}
fmt.Printf("len(sem): %d\n", len(sem)) // <- バッファ内の値を出力

実行結果

len(sem): 1
len(sem): 2
2019/06/15 20:54:28 do fnA. num: 2 
2019/06/15 20:54:28 do fnA. num: 1 
len(sem): 2
2019/06/15 20:54:29 do fnA. num: 3 
len(sem): 2
2019/06/15 20:54:29 do fnA. num: 4 
len(sem): 2
2019/06/15 20:54:30 do fnA. num: 5 
len(sem): 2
2019/06/15 20:54:30 do fnA. num: 6 
2019/06/15 20:54:31 finished

【同時並列数の制御】2-2. 並列数を制限する場合(チャネルを最後にcloseする)

上のをちょっと改良 - チャネルを使ったら最後にcloseしておいた方が安全なので、 - 以下のように全部のgoroutineを待って最後にチャネルをcloseするために、別のgoroutineを用意しておくと良い

  • goroutineが1つだけの場合は最初のgo func()内に、「defer close(チャネル)」を呼び出せばいいが、今回のように複数のgoroutineを待つ場合はこのように書くのが良さそう
func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency)
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }

    // 別のgoroutineで上の全部のgoroutineが終わるまで待つ
    // 終わったらチャネルをclose
    go func() {
        defer close(sem)
        wg.Wait()
    }()
}

https://play.golang.org/p/0MbVqYjU-B3

同時並列数の制御 その他のサンプル

同時並列数の制御として上とほとんど変わらないけど、後で書いたやつがあるのでサンプルとして載せておく 上ではwg.Wait()部分のみをgoroutineで抜き出していたが、これは関数の中身全体を一つのgoroutineで囲っている - こういう書き方もあるよって書いておきたかった - この書き方の問題は、全体をgo func()で囲っているために、関数の本体が長いと見づらくなるので自分はあまりしない

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

func worker(msg string) <-chan string {
    var wg sync.WaitGroup
    res := make(chan string)
    limit := make(chan int, 3)
    go func() {
        for i := 0; i <= 10; i++ {
            limit <- 1
            fmt.Println("len", len(limit))

            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                // 1秒かかる処理のつもり
                // 同時にlimitのバッファサイズ単位で処理していることがわかりやすいようにSleep
                time.Sleep(1 * time.Second)

                res <- fmt.Sprintf("%s done %d", msg, i)
                <-limit
            }(i)
        }
        wg.Wait()
        close(res)
    }()
    return res
}

func main() {
    res := worker("job")
    for v := range res {
        log.Println(v)
    }
}

実行結果

len 1
len 2
len 3
len 3
2019/08/08 06:47:18 job done 1
2019/08/08 06:47:18 job done 2
2019/08/08 06:47:18 job done 0
len 3
len 3
len 3
2019/08/08 06:47:19 job done 5
2019/08/08 06:47:19 job done 3
2019/08/08 06:47:19 job done 4
len 3
len 3
len 3
2019/08/08 06:47:20 job done 8
2019/08/08 06:47:20 job done 6
2019/08/08 06:47:20 job done 7
len 3
2019/08/08 06:47:21 job done 10
2019/08/08 06:47:21 job done 9

【同時並列数の制御】3. 並列数を制限してエラー処理もする場合

上のコードで、fnAがエラーを返す場合のエラー処理を入れる場合は以下になる

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency)
    errChan := make(chan error, len(numbers))
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            if err := fnA(n); err != nil {
                errChan <- fmt.Errorf("failed to A, %v", err)
                log.Printf("--> fnA len(errChan) %d", len(errChan))

                time.Sleep(1 * time.Second) // 処理をわかりやすくするため
                return
            }
            time.Sleep(1 * time.Second) // 処理をわかりやすくするため
        }(num)
    }

    go func() {
        defer close(sem)
        defer close(errChan)
        wg.Wait()
    }()

    for err := range errChan {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}
  • goroutine内で生じたエラーを外に伝えるために、errChanというチャネルを用意 errChan := make(chan error, len(numbers))
    • このチャネルのバッファ数が重要!!
  • fnAの実行時にエラーが発生した場合はerrChanに送信
  • errChanからエラーを読み取って、errを返す
for err := range errChan {
    return err
}
  • wg.Wait()のあとに close(errChan) もする

errChanのバッファ数を起動されるgoroutineの数(ここではnumbersの6)だけ用意することで、エラーが複数発生してもチャネルが詰まらないようにしているのがポイント

https://play.golang.org/p/KdQB7fLn9Na

実行結果

2019/06/16 08:46:51 do fnA.
2019/06/16 08:46:51 --> failed to do fnA. num: 2
2019/06/16 08:46:51 --> fnA len(errChan) 1
2019/06/16 08:46:51 do fnA.
2019/06/16 08:46:51 --> failed to do fnA. num: 1
2019/06/16 08:46:51 --> fnA len(errChan) 2
2019/06/16 08:46:52 do fnA.
2019/06/16 08:46:52 --> failed to do fnA. num: 3
2019/06/16 08:46:52 --> fnA len(errChan) 3
2019/06/16 08:46:52 do fnA.
2019/06/16 08:46:52 --> failed to do fnA. num: 4
2019/06/16 08:46:52 --> fnA len(errChan) 4
2019/06/16 08:46:53 do fnA.
2019/06/16 08:46:53 --> failed to do fnA. num: 5
2019/06/16 08:46:53 --> fnA len(errChan) 5
2019/06/16 08:46:53 error occured. failed to A, error A. num: 2
2019/06/16 08:46:53 finished
  • エラーが発生するたびにerrChanのバッファが埋まっていく様子がわかる
バッファ数が足りないとどうなるか?

試しに、errChanのバッファ数を0にすると、読み取り手がいないエラーを複数投げようとして詰まってdeadlockが発生する errChan := make(chan error)

https://play.golang.org/p/Zy7xu6k8U9Y

実行結果

2019/06/16 08:25:37 do fnA.
2019/06/16 08:25:37 --> failed to do fnA.
2019/06/16 08:25:37 do fnA.
2019/06/16 08:25:37 --> failed to do fnA.
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
以下省略

参考

上のコードが全部成功した場合

一応載せておくとこんな感じ

var errFlag bool = false にして実行する

https://play.golang.org/p/ZmRaPLpWC3T

実行結果

2019/06/16 08:18:39 do fnA.
2019/06/16 08:18:39 --> succeeded to do fnA. num: 2
2019/06/16 08:18:39 do fnA.
2019/06/16 08:18:39 --> succeeded to do fnA. num: 1
2019/06/16 08:18:40 do fnA.
2019/06/16 08:18:40 --> succeeded to do fnA. num: 3
2019/06/16 08:18:40 do fnA.
2019/06/16 08:18:40 --> succeeded to do fnA. num: 4
2019/06/16 08:18:41 do fnA.
2019/06/16 08:18:41 --> succeeded to do fnA. num: 5
2019/06/16 08:18:41 do fnA.
2019/06/16 08:18:41 --> succeeded to do fnA. num: 6
2019/06/16 08:18:42 finished

【同時並列数の制御】4. contextを使ってエラー制御をきちんとする

上のエラーが起きたときの挙動を見てみると、エラーが起きてもすぐに終了していないことがわかる

上のコードで、起動時のnumを出力させて見ると以下のようになる

 for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)  ← 出力

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            log.Printf("goroutine num: %d", num) ← 出力

https://play.golang.org/p/mlzfPOWDDWt

実行結果

2019/06/16 16:51:31 num: 1
2019/06/16 16:51:31 num: 2
2019/06/16 16:51:31 goroutine n: 2
2019/06/16 16:51:31 do fnA.
2019/06/16 16:51:31 --> failed to do fnA. num: 2
2019/06/16 16:51:31 --> fnA len(errChan) 1
2019/06/16 16:51:31 goroutine n: 1
2019/06/16 16:51:31 do fnA.
2019/06/16 16:51:31 --> failed to do fnA. num: 1
2019/06/16 16:51:31 --> fnA len(errChan) 2
2019/06/16 16:51:32 num: 3
2019/06/16 16:51:32 goroutine n: 3
2019/06/16 16:51:32 do fnA.
2019/06/16 16:51:32 --> failed to do fnA. num: 3
2019/06/16 16:51:32 --> fnA len(errChan) 3
2019/06/16 16:51:32 num: 4
2019/06/16 16:51:32 goroutine n: 4
2019/06/16 16:51:32 do fnA.
2019/06/16 16:51:32 --> failed to do fnA. num: 4
2019/06/16 16:51:32 --> fnA len(errChan) 4
2019/06/16 16:51:33 num: 5
2019/06/16 16:51:33 goroutine n: 5
2019/06/16 16:51:33 do fnA.
2019/06/16 16:51:33 --> failed to do fnA. num: 5
2019/06/16 16:51:33 --> fnA len(errChan) 5
2019/06/16 16:51:33 num: 6
2019/06/16 16:51:33 error occured. failed to A, error A. num: 2
2019/06/16 16:51:33 finished

これはリソースの無駄なので、エラーが起きたら即終了させるようにしたい

こういうときはcontextが便利

「【同時並列数の制御】3」のソースコードをcontextを使って以下のように書き直す

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background()) // contextとキャンセル関数を定義
    defer cancel() // doTask終了時に子プロセスを全て終了するようにしたい

    sem := make(chan struct{}, concurrency)
    errChan := make(chan error, len(numbers))
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()

            log.Printf("goroutine num: %d", n)
            select {
            case <-ctx.Done(): // contextのcancelが呼び出されたらここに入って即終了する
                return
            default:
            }
            if err := fnA(n); err != nil {
                errChan <- fmt.Errorf("failed to A, %v", err)
                log.Printf("--> fnA len(errChan) %d", len(errChan))

                // エラーが発生したら他の処理はキャンセル
                cancel()
                time.Sleep(1 * time.Second) // 処理をわかりやすくするため
                return
            }
            time.Sleep(1 * time.Second) // 処理をわかりやすくするため
        }(num)
    }

    go func() {
        defer close(sem)
        defer close(errChan)
        wg.Wait()
    }()

    for err := range errChan {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}
  • contextのcancelが呼び出されたら「<-ctx.Done()」を受け取って即終了するようにする
select {
case <-ctx.Done():
    return
default:
}
  • エラーが発生したら他の処理はキャンセルするため cancel() を送る

https://play.golang.org/p/N1mjZlo51VV

実行結果

2019/06/16 16:54:11 num: 1
2019/06/16 16:54:11 num: 2
2019/06/16 16:54:11 goroutine num: 3
2019/06/16 16:54:11 do fnA.
2019/06/16 16:54:11 --> failed to do fnA. num: 2
2019/06/16 16:54:11 --> fnA len(errChan) 1
2019/06/16 16:54:11 goroutine num: 3
2019/06/16 16:54:11 num: 3
2019/06/16 16:54:11 goroutine num: 4
2019/06/16 16:54:11 num: 4
2019/06/16 16:54:11 goroutine num: 5
2019/06/16 16:54:11 num: 5
2019/06/16 16:54:11 goroutine num: 6
2019/06/16 16:54:11 num: 6
2019/06/16 16:54:11 error occured. failed to A, error A. num: 2
2019/06/16 16:54:11 finished
  • 「do fnA. 」は一度しか呼び出されていない
  • 一つエラーが発生したら、それ以外のgoroutineは起動してもすぐに処理が終わっていることがわかる

【同時並列数の制御】5. contextに加えてerrgroupを使ってエラー制御をかんたんにする

errgroupを使うことで、エラー制御が便利になる。

以下は、syncの代わりにerrgroupを使っている

  • go get golang.org/x/sync/errgroup でerrgroupを取得
  • errChanは使わないで済むようになった
  • 失敗した時の他の処理の取り消しはcancelを書かなくても勝手にやってくれる
package main

import (
    "context"
    "fmt"
    "log"

    "golang.org/x/sync/errgroup"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    eg, ctx := errgroup.WithContext(context.Background())

    sem := make(chan struct{}, concurrency)
    defer close(sem)
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)

        n := num
        eg.Go(func() error {
            defer func() { <-sem }()
            log.Printf("goroutine num: %d", n)
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }
            if err := fnA(n); err != nil {
                return fmt.Errorf("failed to A, %v", err)
            }
            return nil
        })
    }

    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}

https://play.golang.org/p/ycVHbLn45b6

実行結果

2019/06/16 17:17:31 num: 1
2019/06/16 17:17:31 num: 2
2019/06/16 17:17:31 goroutine num: 2
2019/06/16 17:17:31 do fnA.
2019/06/16 17:17:31 --> failed to do fnA. num: 2
2019/06/16 17:17:31 goroutine num: 1
2019/06/16 17:17:31 num: 3
2019/06/16 17:17:31 goroutine num: 3
2019/06/16 17:17:31 num: 4
2019/06/16 17:17:31 goroutine num: 4
2019/06/16 17:17:31 num: 5
2019/06/16 17:17:31 goroutine num: 5
2019/06/16 17:17:31 num: 6
2019/06/16 17:17:31 goroutine num: 6
2019/06/16 17:17:32 error occured. failed to A, error A. num: 2
2019/06/16 17:17:32 finished

参考:

go言語でシグナルをきちんとエラーハンドリングする

関連

並行処理全般に関するメモは以下 go言語の並行処理 - ludwig125のブログ

go言語でsignalを適切に処理する方法を調べたので例をいくつか

シグナルを受け付けて関数を適切に終了させる例1

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {

    sigs := make(chan os.Signal, 1)
    ctx, cancel := context.WithCancel(context.Background())

    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    defer func() {
        // シグナルの受付を終了する
        signal.Stop(sigs)
        cancel()
    }()

    go func() {
        select {
        case sig := <-sigs: // シグナルを受け取ったらここに入る
            fmt.Println("Got signal!", sig)
            cancel() // cancelを呼び出して全ての処理を終了させる
        }
    }()

    if err := doTask(ctx); err != nil {
        fmt.Printf("failed to doTask: %v", err)
        cancel() // 何らかのエラーが発生した場合、他の処理も全てcancelさせる
        return
    }
    fmt.Println("done successfully.")
}

func doTask(ctx context.Context) error {
    defer fmt.Println("done doTask")
    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            fmt.Println("received done")
            return ctx.Err()
        default:
        }
        // // エラー時の挙動が見たい場合はここのコメントアウトを外す
        // if i == 3 {
        //  return fmt.Errorf("error happened")
        // }

        // do something
        fmt.Println("sleep 1. count:", i)
        time.Sleep(1 * time.Second)
    }
    return nil
}

動作確認

順にそれぞれの挙動を見てみる

正常終了時

$go run signal3/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
sleep 1. count: 3
sleep 1. count: 4
done doTask
done successfully.

異常終了時(上のコメントアウトを外した時)

$go run signal3/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
done doTask
failed to doTask: error happened%                                                                                                               

Ctrl+Cをした時

$go run signal3/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
^CGot signal! interrupt
received done
done doTask
failed to doTask: context canceled%                                                                                                             
  • Ctrl+Cのとき、signalを受け取った後cancelされていることがわかる

シグナルを受け付けて関数を適切に終了させる例2(タスクの中身がgoroutine)

練習がてら、doTaskの中身をgoroutineにしてみた場合も考えてみた

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"

    "golang.org/x/net/context"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, os.Interrupt)
    defer func() {
        // シグナルの受付を終了する
        signal.Stop(sigs)
        cancel()
    }()
    go func() {
        select {
        case sig := <-sigs: // シグナルを受け取ったらここに入る
            fmt.Println("Got signal!", sig)
            cancel() // cancelを呼び出して全ての処理を終了させる
            return
        }
    }()

    res, err := doTask(ctx)
    for v := range res {
        fmt.Println("done successfully.", v)
    }
    for e := range err {
        fmt.Printf("failed to doTask: %v", e)
        cancel() // 何らかのエラーが発生した場合、他の処理も全てcancelさせる
        return
    }
}

func doTask(ctx context.Context) (<-chan string, <-chan error) {
    resCh := make(chan string)
    errCh := make(chan error, 5)
    go func() {
        defer fmt.Println("done doTask")
        defer close(resCh)
        defer close(errCh)
        for i := 0; i < 5; i++ {
            select {
            case <-ctx.Done():
                fmt.Println("received done")
                // Do something before terminated
                time.Sleep(500 * time.Millisecond)
                errCh <- ctx.Err()
                return
            default:
            }
            // // エラー時の挙動が見たい場合はここのコメントアウトを外す
            // if i == 3 {
            //  errCh <- fmt.Errorf("error happened")
            //  return
            // }

            // do something
            fmt.Println("sleep 1. count:", i)
            time.Sleep(time.Second)
        }
        resCh <- fmt.Sprintf("something")
    }()
    return resCh, errCh
}

動作確認

正常終了時

$go run signal5/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
sleep 1. count: 3
sleep 1. count: 4
done doTask
done successfully. something

異常終了時(上のコメントアウトを外した時)

$go run signal5/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
done doTask
failed to doTask: error happened%                                                                                                               

Ctrl+Cをした時

$go run signal5/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
^CGot signal! interrupt
received done
done doTask
failed to doTask: context canceled%                                                                                                             

問題なさそう

シグナルを受け付けて関数を適切に終了させる例3(独自のcontextを用意する)

こちらの記事で紹介されていたNewCtxを使ってみる Managing Groups of Goroutines in Go - The Startup - Medium

これは、signalを処理してcontextのcancelを送るctxを定義するもの

シグナルを受け付けるまで処理し続けるような場合(main側でエラー時にcancelを使う必要がない場合)では、上のコードがかなり減った

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "golang.org/x/net/context"
)

func main() {
    res, err := doTask(newCtx())
    for v := range res {
        fmt.Println("done successfully.", v)
    }
    for e := range err {
        fmt.Printf("failed to doTask: %v", e)
        return
    }
}

func newCtx() context.Context {
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        sCh := make(chan os.Signal, 1)
        signal.Notify(sCh, syscall.SIGINT, syscall.SIGTERM)
        <-sCh
        fmt.Println("Got signal!")
        cancel()
    }()
    return ctx
}

func doTask(ctx context.Context) (<-chan string, <-chan error) {
    resCh := make(chan string)
    errCh := make(chan error, 5)
    go func() {
        defer fmt.Println("done doTask")
        defer close(resCh)
        defer close(errCh)
        for i := 0; i < 5; i++ {
            select {
            case <-ctx.Done():
                fmt.Println("received done")
                // Do something before terminated
                time.Sleep(500 * time.Millisecond)
                errCh <- ctx.Err()
                return
            default:
            }

            // do something
            fmt.Println("sleep 1. count:", i)
            time.Sleep(time.Second)
        }
        resCh <- fmt.Sprintf("something")
    }()
    return resCh, errCh
}

実行結果

シグナル送った時

$go run signal_pattern/signal6/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
^CGot signal!
received done
done doTask
failed to doTask: context canceled

正常終了時

$go run signal_pattern/signal6/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
sleep 1. count: 3
sleep 1. count: 4
done doTask
done successfully. something

Ubuntu 18.04.2のgoのversionを最新にする

目的

go modを使いたかったのでgoを1.11以上にする必要があった せっかくなので最新にした

インストール方法参考

公式

環境

手順

ほとんどこのままできた How To Install Go 1.13 on Ubuntu 18.04 & 16.04 LTS – TecAdmin

Install

sudo apt-get update
sudo apt-get -y upgrade

最新のバージョンをこちらで確認して取ってくる

  • 現在の最新版は以下のリンクから確認できる

golang.org

wget https://dl.google.com/go/go1.12.7.linux-amd64.tar.gz

取ってきたgoを配置

sudo tar -C /usr/local -xzf  go1.12.7.linux-amd64.tar.gz

-C オプションをつけることで特定のディレクトリ先に解凍できる

Setup Go Environment

$ export GOROOT=/usr/local/go

GOPATHの設定

  • 自分の場合はGOPATHを以下にしている
$ export GOPATH=/home/$USER/go

$ echo $GOPATH
/home/ludwig125/go
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH

Verify Installation

$go version
go version go1.12.7 linux/amd64

set path

自分はzshを使っているので、.zshrcに以下を記載 これで次回から /usr/local/go 以下が読み込まれるようになる

# go のPATHを指定
export GOROOT=/usr/local/go
export GOPATH=/home/$USER/go
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
  • 最初これを忘れてて、毎回起動時に /usr/bin 以下のgoが優先されて使われて困った

おわり

同一Lan内からLinuxの複数のファイルをまとめてダウンロードさせる

目的

家や会社などの、同じLan内の別のデバイスから複数のファイルをまとめてダウンロードできるようにする

具体例

他の人に複数の写真の共有をしたいときに、

  • メールなどで送るのめんどくさい
  • Dropboxなどのラウドサービスを使いたくない
  • USBメモリとかで受け渡ししたくない

そういうときに、Linuxなどpythonやzipコマンドが使える環境であれば簡単に(?)できる

環境

Ubuntu 18.04.2 LTS

python3 (python2でもいいはず)

手順

複数ファイルを以下のようにディレクトリにまとめて (ここではディレクトリ名は「0803」にしていたが任意)

[~/tmp/0803] $ls
P1020416.JPG*  P1020420.JPG*  P1020424.JPG*  P1020428.JPG*  P1020432.JPG*  P1020436.JPG*  P1020440.JPG*  P1020444.JPG*  P1020448.JPG*
P1020417.JPG*  P1020421.JPG*  P1020425.JPG*  P1020429.JPG*  P1020433.JPG*  P1020437.JPG*  P1020441.JPG*  P1020445.JPG*  P1020449.JPG*

以下でzip圧縮

zip -r 0803.zip 0803

圧縮したディレクトリと同じディレクトリ内でpythonでサーバを立てる

python3 -m http.server --cgi 8181

サーバを立てた状態であれば、同一Lan内のどのデバイスからでも以下で接続して0803.zipがダウンロードできる状態になっているはず(IPアドレスの確認方法は後述)

http://192.168.3.11:8181/

※ローカルIPアドレスを調べる方法はifconfig

  • 自分の例では192.168.3.11
$hostname -I
192.168.3.11 172.17.0.1 

参考:

ludwig125.hatenablog.com

GKEのチュートリアルでkubectlをインストールできなかった

cloud.google.com

こちらの方法に従ってkubectlをインストールしようとしたらエラーが出て失敗

gcloud components install kubectl
$ gcloud components install kubectl
 ERROR: (gcloud.components.install) 
You cannot perform this action because the Cloud SDK component manager 
is disabled for this installation. You can run the following command 
to achieve the same result for this installation: 

sudo apt-get install kubectl

sudo apt-get install kubectl も失敗した

$ sudo apt-get install kubectl
E: パッケージ 'kubectl ' にはインストール候補がありません

Kubernetes set-up on ubuntu on Google compute - Stack Overflow

ERROR: (gcloud.components.update) The component manager is disabled for this installation

It is a known issue on the Google Cloud SDK issue tracker : Issue 336: kubectl not installed by google-cloud-sdk debian package, and not installable

Unfortunately, it provides a poor experience for first timer testing kubernetes as it's hard to find a quick AND CLEAN step by step solution.

Here is one:

とあったので、以下のコマンドを逐次実行したことで解決した

sudo apt-get update
sudo apt-get remove google-cloud-sdk
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
gcloud init
gcloud components list
gcloud components install kubectl
gcloud components list

注意:もともと入っているgcloud関連パッケージが消えてなくなるので、gcloud components listをして何が入っていたのかなど確認してから実行したほうがいい