ludwig125のブログ

頑張りすぎずに頑張る父

GKE を格安で使うためにやったこと

GKE を格安で使うためにやったこと

目的

GKEで高機能なマシンを使いたいけどお金がかかるのがネックだったので、コストを最小にするために取り組んだことを書いた

GKE (Google Kubernetes Engine)のドキュメントはこれ

やりたいこと

  • GKEを使ってある程度(1時間以上)時間がかかるバッチ処理をやりたい
  • できれば処理の負荷に応じて動かすマシンのスペックを変えたいのでGAEやFaasよりもKubernetesを使いたい
  • バッチ処理なので、一日のうち稼働していない時間の方が長い
  • できれば使った分だけ課金されるようにしたいが、GKEは後述する通りクラスタが存在する限り課金されてしまう
  • 勉強のためにKubernetesを使いたい ← 一番の動機で他は後付け

そこで、こうすれば安く使えるかも、と思った方法を以下に記載する

ここで紹介する方法でできないこと

以下の方法は、稼働時間が一日数時間のバッチ処理を安く使う方法なもので、一日中動き続ける必要があるサーバには使えない。

料金の確認

まずGKEにかかる料金を確認する

以下を見ると、ノードは秒単位で課金される, とある

ノードとはGKEのPodを動作させるマシンのこと GCPではGoogle Compute Engine(GCE)上のVMのことを指す

料金 | Kubernetes Engine のドキュメント | Google Cloud

ノードの料金 GKE では、Google Compute Engine インスタンスクラスタ内のノードとして使用します。ノードを削除するまでは、Compute Engine の料金設定に基づいて、これらのインスタンスごとに課金されます。Compute Engine リソースは秒単位で課金され、最小使用料金は 1 分間分です。

ではVMはいくらかかるかというと、標準的なマシンで一番スペックの低い、 仮想CPU数 1、メモリ3.75GBのn1-standard-1でも、一か月間継続利用すると24ドル強かかることがわかる(高い)

すべての料金 | Compute Engine ドキュメント | Google Cloud

image

マシンタイプ 仮想 CPU 数 メモリ 料金(米ドル) プリエンプティブル料金(米ドル)
n1-standard-1 1 3.75 $24.2725 $7.30

もしCPU 4、メモリ15GBのマシンを一か月使用したらこれ1台だけで1万円くらいかかることになる

マシンタイプ 仮想 CPU 数 メモリ 料金(米ドル) プリエンプティブル料金(米ドル)
n1-standard-4 4 15GB $97.0900 $29.20

趣味の開発にはとてもそんな高いお金は払えない

プリエンプティブルについては後述する

一か月の使用料金について

【実例】n1-standard-1でHello Worldを表示するだけのサーバを動かした時の費用

以下、n1-standard-1でHello Worldを表示するだけのサーバを3週間近く放置した結果を見てみる

これは、GKEのチュートリアルHello Worldを表示するサーバを作って、うっかりクラスタを消さなかったらこうなった(実体験)

6876円という金額が請求された!

image

SKU プロダクト SKU ID 使用 費用 1 回限りのクレジット 割引 小計
N1 Predefined Instance Core running in Americas Compute Engine 2E27-4F75-95CD 1,289.2 hour ¥4,431 ¥0 ¥-196 ¥4,235
N1 Predefined Instance Ram running in Americas Compute Engine 6C71-E844-38BC 4,834.49 gibibyte hour ¥2,227 ¥0 ¥-99 ¥2,129
Storage PD Capacity Compute Engine D973-5D65-BAB2 176.97 gibibyte month ¥509 ¥0 ¥0 ¥509
Network Inter Zone Egress Compute Engine DE9E-AFBC-A15A 3.06 gibibyte ¥3 ¥0 ¥0 ¥3

なんでこんなに高いの?

SKUについて

まずこのSKUという料金体系が非常にわかりにくいし、どこにもはっきりと書いていない。

CloudSQLを安くするために考えたこと - ludwig125のブログ

以前↑にも書いたけど、多分SKUというのは最小管理単位 (Stock Keeping Unit) の略だと考えられる。

すべての料金  |  Compute Engine ドキュメント  |  Google Cloud

ここには

米ドル以外の通貨でお支払いの場合は、Cloud Platform SKU に記載されている該当通貨の料金が適用されます。

とあるけど自分が見たときはリンク先のドキュメントは何も書いてなかった(ドキュメントのミス?)

ちなみに以前のSKUはこちららしい

料金の内訳

すべての料金  |  Compute Engine ドキュメント  |  Google Cloud

ここにはn1-standard-1 のマシンが一時間あたり$0.0475 と書いてあるので、 このレポートに書いてある1,289.2時間という使用時間に$0.0475をかけて、 当時のドル円のレート約108円をかけると、

1289.2*0.0475*108=6613.596 となった。

これは、SKUの以下の合計金額とほぼ一致する。

  • N1 Predefined Instance Core running in Americas:¥4,431
  • N1 Predefined Instance Ram running in Americas:¥2,227

上の2項目の合算がn1-standard-1のマシンの使用料金に対応していると考えられる。

1289.2という使用時間について

GKEはクラスタを作る際に、デフォルトで3ノード作成される。つまり3台分のVMの料金が発生している。

gcloud container clusters create  |  Cloud SDK  |  Google Cloud

--num-nodes=NUM_NODES; default=3

そのため、VM一台あたりの稼働日数を計算するためには1289.2時間を3で割って さらに24時間で割ればいい。 1289.2/3/24=17.9 ということで、これはレポートにも表示されているVMの稼働した期間(6/20〜7/7くらい)と大体一致する。

プリエンプティティブルなマシンについて

プリエンプティティブルなマシンは、24時間でシャットダウンされるので長い時間がかかるバッチ処理には向いていない

逆に、短い処理、または途中で止まってもいいような処理の場合はプリエンプティティブルで問題ないので、趣味で使う分にはそちらのほうがいい場合もある

格安でGKEを使おうとする記事はプリエンプティブルなマシンについて紹介されていることが多い

どうやったら安く使えるか考える

プリエンプティブルを使っても、ある程度のスペックのマシンを使おうとするとそれなりに高い。

Kubernetesクラスタを稼働させるVMは、サービスを動かしていようがいまいが存在する限り課金される。 自分が作ろうと思っているバッチ処理の場合、一日数時間動いて残りは無駄に課金されることになる。

これは避けたい。

「必要な分だけ課金されるのがいいならサーバレスとか、GAEを頑張れないか」とも思ったけど、 でもKubernetes勉強のために使ってみたい。

それにKubernetesなら必要に応じてマシンのスペックを変えられるという点もいい。

考えた結果、

という発想から考えたのが以下の構成

構成の概要

image

上の横一列はソースコードのビルド処理を表す

  1. githubにpushすると
  2. circleciがビルドジョブを起動して、
  3. docker imageをGCRにpushする。
  4. circleciのcron機能を使って設定した時刻に、GKEクラスタの作成とデプロイを行うcircleciジョブを実行するようにする
  5. GKEのデプロイをする際に、GCRから最新のDockerImageを取得する
  6. kubernetesのcronJobを数分おきに起動するようにして、いつデプロイされても数分後に実行されるようにしておく
  7. cronJobは、何らの処理(ここではcpu情報などをslackに通知する処理とした)をした後で、
  8. 最後にcircleciジョブのAPIを呼び出してGKEクラスタ削除ジョブを実行させる

1〜3までは多くの人がやっている方法。 普通はこの後デプロイジョブを続けて起動させるのが一般的だが、今回の方法ではこのままデプロイはしない。

下の横一列4〜8が今回自分が考えた部分。

プログラミング言語はGo言語を使用する

クラスタ(VM)が存在する期間を最小にすることで、大幅なコスト削減が見込めるはず

詳細な手順

以下、自分の検証を追う形で詳細な手順を紹介する。 結果は一番最後に記載するので、結果だけ知りたい人は「最終的に作ったもの」を見てください

GCRにdocker pushするまで

まずはcircleciのジョブでGCRにdocker pushするところまで行う。(図の黄色枠で囲まれた部分)

image

事前にGKEのチュートリアルのプロジェクトの作成作業が必要。

チュートリアルに沿って以下でプロジェクトを作成しておく

プロジェクトの選択 -> 新しいプロジェクト

image

gke-test-ludwig125-2 という名前でプロジェクトを作成 (gke-test-ludwig125で設定をミスったので作り直した。これ以降の画像は全てgke-test-ludwig125-2 と読み替える必要がある) image

デフォルトのプロジェクトを変更

gcloud config set project gke-test-ludwig125-2

マシンを置く場所を選ぶ 安いところならどこでもいいので、ここでは以下を選択

gcloud config set compute/zone us-west1-c

main.goの作成 ここではログにCPUを出力するだけの簡単なプログラムとした。

package main

import (
    "log"
    "runtime"
)

func main() {
    log.Println("Hello, World!")
    log.Printf("cpu: %d\n", runtime.NumCPU())
    log.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
}

Dockerfileを以下のように作成

FROM golang:1.13-alpine
WORKDIR /go/src/github.com/ludwig125/gke-test
COPY . .
RUN go install github.com/ludwig125/gke-test
CMD ["gke-test"]

これをcircleci上でbuildする

GKEを使うので、DockerのimageはGCRで管理する

GCRについては以下が参考

.circleci/config.yml を以下のように書く

  • 本当はcircleciを2.1で書きたかったが、後述するcircleci APIが2.0にしか対応していないため、仕方なく2.0で書く

.config.yml

version: 2
jobs:
  build:
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
      IMAGE_NAME: gke-test
    docker:
      - image: google/cloud-sdk
    working_directory: /go/src/github.com/ludwig125/gke-test
    steps:
      - checkout
      - setup_remote_docker:
          version: 18.06.0-ce
      - run:
          name: Setup CLOUD SDK
          command: |
            # base64 -i ignore non-alphabet characters
            echo $GCLOUD_SERVICE_KEY | base64 -di > ${HOME}/gcloud-service-key.json
            gcloud auth activate-service-account --key-file ${HOME}/gcloud-service-key.json
            gcloud --quiet auth configure-docker
      - run:
          name: Docker Build & Push
          command: |
            docker build -t us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_BUILD_NUM} .
            docker tag us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_BUILD_NUM} us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:latest
            if [ -n "${CIRCLE_TAG}" ]; then
              docker tag us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_BUILD_NUM} us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_TAG}
            fi
            docker push us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}

workflows:
  version: 2
  master-build:
    jobs:
      - build:
          filters:
            branches:
              only: master

上の説明

Setup CLOUD SDK

  • echo $GCLOUD_SERVICE_KEY | base64 -di > ${HOME}/gcloud-service-key.json
    • circleciの環境変数に登録したGCLOUD_SERVICE_KEYをbase64デコードしてcircleciのローカル内に保存
    • base64 -iしないとサービスアカウントのjsonをデコードする際にエラーを出したので必要
  • gcloud auth activate-service-account --key-file ${HOME}/gcloud-service-key.json
    • サービスアカウントを上でリダイレクトしたファイルから読み込み
  • gcloud --quiet auth configure-docker
    • サービスアカウントの認証

Docker Build & Push

これをgithubに上げる

circleciの作業

circleciに登録 image

Linux -> go を選んで Start Buildingボタンを押す

このままではcircleciがGKEのプロジェクトにアクセスできないので失敗するはず

サービスアカウントの追加

circleciからプロジェクトを操作するために、 以下の方法でサービスアカウントを作成しておく

image

I AMと管理 -> サービスアカウント 「サービスアカウントを作成」

gke-test で作成

image

【サービスアカウントの追加】ロールについて

ここで書いている構成にするためには、以下のロール(権限を管理している)が必要(2021/02/09時点)

サービス アカウントの権限(オプション) は以下のようにそれぞれ、

  • Kubernetes Engine」-> 「Kubernetes Engine 管理者」
  • 「Service Accounts」 -> 「サービス アカウント ユーザー」
  • 「ストレージ」-> 「ストレージ管理者」
  • 「Compute Engine」->「Compute インスタンス管理者」(<-2021/02/09追記 1月のバージョンからないと怒られるようになったので)

を選択する

image

(「Compute インスタンス管理者」は後から必要になったので画像にはない)

【サービスアカウントの追加】Kubernetes Engine 管理者

Kubernetes Engine 開発者ではだめ

  • 管理者ではなく開発者を選んでしまうと、後述のcircleciでclusterの作成と削除で以下のようなエラーが出てしまった
  • 必ずKubernetes 管理者を選ぶ必要がある
bash-4.4# gcloud --quiet container clusters delete gke-test-small-cluster
ERROR: (gcloud.container.clusters.delete) Some requests did not succeed:
- ResponseError: code=403, message=Required "container.clusters.delete" permission(s) for "projects/gke-test-ludwig125-2/zones/us-west1-c/clusters/gke-test-small-cluster". See https://cloud.google.com/kubernetes-engine/docs/troubleshooting#gke_service_account_deleted for more info.

bash-4.4# gcloud services enable container.googleapis.com
ERROR: (gcloud.services.enable) PERMISSION_DENIED: The caller does not have permission

参考:

【サービスアカウントの追加】サービス アカウント ユーザー

また、「サービスアカウント ユーザー」がないと、後述のcircleciでサービスアカウントを使ったクラスタが作成できず、以下のようなエラーがでる

(gcloud.container.clusters.create) ResponseError: code=400, message=The user does not have access to service account "default".

参考:

【サービスアカウントの追加】ストレージ管理者

これがないとGCRにDockerImageを上げられない 以下のようなエラーが出る

The push refers to repository [us.gcr.io/gke-test-ludwig125-2/gke-test]


denied: Token exchange failed for project 'gke-test-ludwig125-2'. Caller does not have permission 'storage.buckets.get'. To configure permissions, follow instructions at: https://cloud.google.com/container-registry/docs/access-control
Exited with code 1

アクセス制御の構成  |  Container Registry  |  Google Cloud に書いてある通り、ストレージ管理者が権限に必要

【サービスアカウントの追加】Compute インスタンス管理者

2021/02/09 追記

1/20以降、これがないと、Kubernetesのデプロイ前に以下の設定時にエラーが出るようになった。

確実な原因は特定できず

root@b0b686c15028:~# gcloud config set compute/zone us-central1-f
Updated property [compute/zone].
ERROR: (gcloud.config.set) Some requests did not succeed:
 - Required 'compute.zones.list' permission for 'projects/<自分のプロジェクトID>'

compute.zones.list が必要だというので、以下を参考に、compute.zones.* を権限に含む、Compute インスタンス管理者 をロールに追加した

https://cloud.google.com/compute/docs/access/iam#compute.instanceAdmin.v1

【サービスアカウントの追加】ここまでのロール確認

ここまででIAMを見るとこんな感じ

image

の4つが設定されている

【サービスアカウントの追加】キーの作成以降

キーの作成 JSONを選ぶ image

以下の形式のJSONファイルがダウンロードできるので、ダウンロードしておく

{
  "type": "service_account",
  "project_id": "gke-test-ludwig125-2",
  "private_key_id": "XXXXXXX",
  "private_key": "-----BEGIN PRIVATE KEY-----\nXXXXXXXXXXXXX

以下ではservice_account.jsonの名前で保存したものとする

ローカルのディレクトリに置くときは、githubに上がらないようにgitignoreしておく

$cat .gitignore
service_account.json

これをbase64 encodeして、出力結果をコピー cat service_account.json| base64

circleciのEnvironment Variables に上の結果を登録 config yamlの内容に合わせて GCLOUD_SERVICE_KEYという名前のキーにする image

これでcircleciのジョブを再実行 うまくいくとこんな感じ

image

gcr image

ここまでで参考にさせていただいたページ

circleCIとgithubを連携して、簡単にコードをGCRにpushできるようにしてみた - アプリとサービスのすすめ

GKE+CircleCI 2.0で継続的デプロイ可能なアプリケーションをシュッと作る - Eureka Engineering - Medium

GKEのデプロイ

ここからは、GKEのデプロイ以降の部分について記載する(図の黄色枠で囲まれた部分)

image

GKEの手動デプロイ

ここからは、GKEにcrojobをデプロイする方法を検証する。

まずは手動でcronjobをデプロイする方法を確認する。

クイックスタート | Kubernetes Engine のドキュメント | Google Cloud に従って作る

PROJECT_NAME=gke-test-ludwig125-2
CLUSTER_NAME=gke-test-small-cluster
COMPUTE_ZONE=us-west1-c
gcloud config set project $PROJECT_NAME
gcloud config set compute/zone $COMPUTE_ZONE
gcloud container clusters create $CLUSTER_NAME --preemptible --machine-type=g1-small --num-nodes 3 --disk-size 10 --zone $COMPUTE_ZONE --enable-autoscaling --min-nodes=1 --max-nodes=3

コマンドの詳細は以下 gcloud container clusters create  |  Cloud SDK  |  Google Cloud

ちなみに、「gcloud container clusters create」の際に--preemptibleをつけるとマシンがプリエンプティブルになる。

$gcloud container clusters list
NAME                    LOCATION    MASTER_VERSION  MASTER_IP       MACHINE_TYPE  NODE_VERSION  NUM_NODES  STATUS
gke-test-small-cluster  us-west1-c  1.13.7-gke.8    35.203.156.134  g1-small      1.13.7-gke.8  2          RUNNING
[~/go/src/github.com/gke-test] $

バッチ処理をするためにcronjob.yamlを用意する。 cronjob.yamlはdeployment.yamlとあまり変わらないが、

  • scheduleとconcurrencyPolicyを設定している
  • .spec.containersにportがいらない

などの違いがある

k8s/cronjob.yaml

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: gke-test
  labels:
    app: gke-test
spec:
  # cronJob 参考
  # https://en.wikipedia.org/wiki/Cron
  # https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/
  # https://kubernetes.io/ja/docs/concepts/workloads/controllers/cron-jobs/
  schedule: "*/1 * * * *" # 1分おきに起動
  concurrencyPolicy: Forbid # 前のCronが動いていたら動作しない
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: gke-test
        spec:
          containers:
          - name: gke-test-container
            image: us.gcr.io/gke-test-ludwig125-2/gke-test
            imagePullPolicy: Always
            command: ["gke-test"]
            resources:
              requests:
                memory: 512Mi
          restartPolicy: Never # Cron失敗時にコンテナを再起動しない

cron参考

自分はkustomizeを使いたいのでGKE(kubernetes)にdeploy

$kustomize build k8s | kubectl apply -f -
cronjob.batch/gke-test created

※今回はcronjob.yamlのファイルが1つだけなので、kustomizeを使わなくても以下のように直接ファイルを指定しても同じ

$kubectl apply -f k8s/cronjob.yaml
cronjob.batch/gke-test created

結局どちらも以下のyamlをデプロイしているだけなことが、 kustomize buildの結果を見るとわかる(余談)

$kustomize build k8s
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  labels:
    app: gke-test
  name: gke-test
spec:
  concurrencyPolicy: Forbid
  jobTemplate:
    metadata:
      labels:
        app: gke-test
    spec:
      template:
        metadata:
          labels:
            app: gke-test
        spec:
          containers:
          - command:
            - gke-test
            image: us.gcr.io/gke-test-ludwig125-2/gke-test
            imagePullPolicy: Always
            name: gke-test-container
            resources:
              requests:
                memory: 512Mi
          restartPolicy: Never
  schedule: '*/1 * * * *'

kubectl applyの結果、cronjobが登録されていることをget cronjobで確認

$kubectl get cronjob
NAME       SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
gke-test   */1 * * * *   False     0        45s             8m16s

ちょっと待つと最初の1分が来てjobが起動する

$kubectl get job
NAME                  COMPLETIONS   DURATION   AGE
gke-test-1568492880   0/1                      0s

podもCronで指定した時刻にできた

$kubectl get pod
NAME                        READY   STATUS              RESTARTS   AGE
gke-test-1568492880-5mfvr   0/1     ContainerCreating   0          19s
$kubectl get pod
NAME                        READY   STATUS      RESTARTS   AGE
gke-test-1568492880-5mfvr   0/1     Completed   0          37s

cronで指定したとおり、1分おきに動いていることがわかる

  • 時刻は9時間ずれているので、UTCらしい
$kubectl get pod
NAME                        READY   STATUS      RESTARTS   AGE
gke-test-1568492940-pz88m   0/1     Completed   0          2m33s
gke-test-1568493000-bbcl9   0/1     Completed   0          92s
gke-test-1568493060-qdsh2   0/1     Completed   0          32s

$kubectl logs gke-test-1568492940-pz88m
2019/09/14 20:29:18 Hello, World!
2019/09/14 20:29:18 cpu: 1
2019/09/14 20:29:18 GOMAXPROCS: 1
$kubectl logs gke-test-1568493000-bbcl9
2019/09/14 20:30:03 Hello, World!
2019/09/14 20:30:03 cpu: 1
2019/09/14 20:30:03 GOMAXPROCS: 1
$kubectl logs gke-test-1568493060-qdsh2
2019/09/14 20:31:03 Hello, World!
2019/09/14 20:31:03 cpu: 1
2019/09/14 20:31:03 GOMAXPROCS: 1
$

デフォルトでは成功したJobは直近の3つまで残っている https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/

The .spec.successfulJobsHistoryLimit and .spec.failedJobsHistoryLimit fields are optional. These fields specify how many completed and failed jobs should be kept. By default, they are set to 3 and 1 respectively

cronが起動できたことまで確認したので一旦消す。 このあとcircleciから自動でcronJobのデプロイを行う

$kubectl delete cronjob gke-test
cronjob.batch "gke-test" deleted

circleciでGKEにdeploy

上のkubernetesのデプロイをcircleciでできるか確認する。

検証の段階なのでビルド後にそのままデプロイジョブが起動されるようにしている。

前述のcircleci/config.ymlにdeploy部分を追加したものが以下

version: 2
jobs:
  build:
    (まえと同じ)
  deploy:
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
      CLUSTER_NAME: gke-test-small-cluster
      COMPUTE_ZONE: us-west1-c
    docker:
      - image: google/cloud-sdk
    working_directory: /app
    steps:
      - checkout
      - setup_remote_docker:
          version: 18.06.0-ce
      - run:
          name: Setup CLOUD SDK
          command: |
            # base64 -i ignore non-alphabet characters
            echo $GCLOUD_SERVICE_KEY | base64 -di > ${HOME}/gcloud-service-key.json
            gcloud auth activate-service-account --key-file ${HOME}/gcloud-service-key.json
      - run:
          name: Setup GKE Cluster Infomation
          command: |
            gcloud config set project $PROJECT_NAME
            gcloud config set container/cluster $CLUSTER_NAME
            gcloud config set compute/zone ${COMPUTE_ZONE}
            gcloud container clusters get-credentials $CLUSTER_NAME
      - run:
          name: Install kustomize
          command: |
            opsys=linux  # or darwin, or windows
            curl -s https://api.github.com/repos/kubernetes-sigs/kustomize/releases |\
                grep browser_download |\
                grep $opsys |\
                cut -d '"' -f 4 |\
                grep kustomize_ |\
                grep -v tar.gz |\
                head -n 1 |\
                xargs curl -O -L
            mv kustomize_*_${opsys}_amd64 kustomize
            chmod u+x kustomize
      - deploy:
          name: Kustomize build and Apply
          command: |
            ./kustomize build ./k8s/ | /usr/bin/kubectl apply -f -      

workflows:
  version: 2
  master-build:
    jobs:
      - build:
          filters:
            branches:
              only: master
      - deploy:
          requires:
            - build

Install kustomize部分の説明

KustomizeのInstall方法は以下に書いてある kustomize/INSTALL.md at master · kubernetes-sigs/kustomize · GitHub

name: Install kustomize
command: |
            opsys=linux
            curl -s https://api.github.com/repos/kubernetes-sigs/kustomize/releases |\
                grep browser_download |\
                grep $opsys |\
                cut -d '"' -f 4 |\
                grep /kustomize/v |\
                sort | tail -n 1 |\
                xargs curl -O -L
            tar xzf ./kustomize_v*_${opsys}_amd64.tar.gz
            ./kustomize version

以上の設定でcircleci(中略)ジョブを実行すると 無事手動のときと同じcronjobの登録、起動が確認できた。

Slack Botの作成

ログに出すだけだと面白くないので、結果をslackに通知するようにしてみる。 この部分はただの趣味なので、クラスタの自動作成削除の本筋とは関係ない。

image

-u で最新を取ってくる

go get -u github.com/nlopes/slack

以下を参考にmain.goを修正

main.go

package main

import (
    "fmt"
    "log"
    "os"
    "runtime"
    "time"

    "github.com/nlopes/slack"
)

func main() {
    start := time.Now()

    res := fmt.Sprintf("cpu: %d\n", runtime.NumCPU())
    res += fmt.Sprintf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))

    token := mustGetenv("SLACK_TOKEN")
    channel := mustGetenv("SLACK_CHANNEL")
    if err := sendSlackMsg(token, channel, res, start); err != nil {
        log.Println(err)
    }
}

func mustGetenv(k string) string {
    v := os.Getenv(k)
    if v == "" {
        log.Fatalf("%s environment variable not set.", k)
    }
    log.Printf("%s environment variable set.", k)
    return v
}

func sendSlackMsg(token, channel, result string, start time.Time) error {
    api := slack.New(token)
    channelID, timestamp, err := api.PostMessage(channel, slack.MsgOptionText(createSlackMsg(start, result), false), slack.MsgOptionUsername("gke-test-Bot"), slack.MsgOptionIconEmoji(":sunny:"))
    if err != nil {
        return fmt.Errorf("failed to send message: %s", err)
    }
    log.Printf("Message successfully sent to channel %s at %s", channelID, timestamp)
    return nil
}

func createSlackMsg(start time.Time, res string) string {
    jst := time.FixedZone("Asia/Tokyo", 9*60*60)
    finish := time.Now()
    processingTime := time.Since(start).Truncate(time.Second)

    msg := "*gke-test が正常に終了しました。*\n"
    msg += fmt.Sprintf("起動時刻: %v\n", start.In(jst).Format("2006-01-02 15:04:05"))
    msg += fmt.Sprintf("終了時刻: %v\n", finish.In(jst).Format("2006-01-02 15:04:05"))
    msg += fmt.Sprintf("所要時間: %v\n\n", processingTime)
    msg += fmt.Sprintf("%s\n", res)
    return msg
}

slackのアイコンに使ったemojiは以下で適当に選ぶか自分でURLを設定する

🎁 Emoji cheat sheet for GitHub, Basecamp, Slack & more

ここではなんとなく:sunny:を選んだ。

明るい気持ちになる!

以下の通り環境変数としてTOKENとCHANNELを設定してローカルで実行してみる

$SLACK_TOKEN=<取ってきたTOKEN> SLACK_CHANNEL=<SLACKで通知したいチャネル> go run main.go
2019/09/16 06:56:17 SLACK_TOKEN environment variable set.
2019/09/16 06:56:17 SLACK_CHANNEL environment variable set.
2019/09/16 06:56:17 Message successfully sent to channel <チャネル> at 1568584577.001500

こんな感じに送れた image

GKEにdeploy

ローカルでcircleciジョブを起動するプログラムができたので、これをGKEにデプロイする

GKEクラスタ作成

[~/go/src/github.com/gke-test] $gcloud container clusters create $CLUSTER_NAME --preemptible --machine-type=g1-small --num-nodes 3 --disk-size 10 --zone $COMPUTE_ZONE --enable-autoscaling --min-nodes=1 --max-nodes=3
中略
Created [https://container.googleapis.com/v1/projects/gke-test-ludwig125-2/zones/us-west1-c/clusters/gke-test-small-cluster].
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/us-west1-c/gke-test-small-cluster?project=gke-test-ludwig125-2
kubeconfig entry generated for gke-test-small-cluster.
NAME                    LOCATION    MASTER_VERSION  MASTER_IP      MACHINE_TYPE  NODE_VERSION  NUM_NODES  STATUS
gke-test-small-cluster  us-west1-c  1.13.7-gke.8    35.247.70.150  g1-small      1.13.7-gke.8  3          RUNNING

GKEにローカルからデプロイ。

環境変数を渡してkustomize buildする

[~/go/src/github.com/gke-test] $SLACK_TOKEN=<取ってきたTOKEN> SLACK_CHANNEL=<SLACKで通知したいチャネル>  kustomize build k8s | kubectl apply -f -
secret/slack-info created
cronjob.batch/gke-test created

ちょっと待つとちゃんとできている

[~/go/src/github.com/gke-test] $kubectl get pod
NAME                        READY   STATUS      RESTARTS   AGE
gke-test-1568607120-hdq5z   0/1     Completed   0          2m16s
gke-test-1568607180-g9ctx   0/1     Completed   0          76s
gke-test-1568607240-827cq   0/1     Completed   0          15s
[~/go/src/github.com/gke-test] $k

circlecleでデプロイする

上の修正したmain.goをそのままcircleciでbuildしようとしても以下のように言われてしまう

docker build 時のログ

Step 4/5 : RUN go install github.com/ludwig125/gke-test
---> Running in 0cdddbf7ff52
main.go:10:2: cannot find package "github.com/nlopes/slack" in any of:
    /usr/local/go/src/github.com/nlopes/slack (from $GOROOT)
    /go/src/github.com/nlopes/slack (from $GOPATH)
The command '/bin/sh -c go install github.com/ludwig125/gke-test' returned a non-zero code: 1
Exited with code 1

slackパッケージが見つからないので依存解決にgo moduleを使う

go mod init を使用して現在のディレクトリをモジュールのルートにする

[~/go/src/github.com/gke-test] $go mod init
go: creating new go.mod: module github.com/gke-test

これで以下のファイルが作られる

[~/go/src/github.com/gke-test] $cat go.mod
module github.com/gke-test

go 1.13
[~/go/src/github.com/gke-test] $

test, build またはinstallのどれかを実行するとslackパッケージがgo.modに追加される

[~/go/src/github.com/gke-test] $go install
go: finding github.com/nlopes/slack v0.6.0
go: downloading github.com/nlopes/slack v0.6.0
go: extracting github.com/nlopes/slack v0.6.0

go.modの中身を見ると以下のようになっている

[~/go/src/github.com/gke-test] $cat go.mod
module github.com/gke-test

go 1.13

require github.com/nlopes/slack v0.6.0
[~/go/src/github.com/gke-test] $

Dockerfileを直す

  • Dockerfileにgo moduleのための設定を追加する

最初に作ったDockerfileはこれ

FROM golang:1.13-alpine
# for go mod download
RUN apk add --update --no-cache ca-certificates git

WORKDIR /go/src/github.com/ludwig125/gke-test
COPY go.mod .
COPY go.sum .

RUN go mod download
COPY . .

# go.mod path 'module github.com/gke-test'
RUN go install github.com/gke-test
CMD ["gke-test"]

これでも動くし、プログラムや設定が間違っていた時に検証しやすい。 ただ、格安で使うことを考えると問題がある

Docker Image の大きさを小さくすることで、GKEのデプロイ時にDocker Imageをプルするときの通信費を抑えることができる これは意外とコストとして大きいので、いろいろと参考にして、Imageは以下のscratchを使った

  • ただ、scratchは何にも入っていないので、うまく動かないときにコンテナに入っていろいろ確認するのが難しい
  • 最初は検証しやすいalpineで確認して、全部うまくいったらscratchベースで作るようにしたほうが開発が楽

軽さを追求して最終的に落ち着いたDockerfileが以下

FROM golang:1.13-alpine as builder

RUN mkdir /gke-test
WORKDIR /gke-test

# Install git + SSL ca certificates.
# Git is required for fetching the dependencies.
# Ca-certificates is required to call HTTPS endpoints.
#RUN apk add --update --no-cache ca-certificates git && update-ca-certificates
RUN apk add --update --no-cache ca-certificates

# COPY go.mod and go.sum files to the workspace
COPY go.mod .
COPY go.sum .

# Get dependancies - will also be cached if we won't change mod/sum
RUN go mod download
# COPY the source code as the last step
COPY . .

# Build the binary
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo -o /go/bin/gke-test

# Second step to build minimal image
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=builder /go/bin/gke-test /go/bin/gke-test
ENTRYPOINT ["/go/bin/gke-test"]
  • 工夫して直している間に、ENTRYPOINTをCMD ["gke-test"]からENTRYPOINT ["/go/bin/gke-test"]にしてたので注意

Dockerfile作成の参考

実際にDockerImageがどのくらい変わったか

自分の場合は、golang:1.13-alpineをベースに作った時イメージは390MBくらいだったのが、 scratchベースでは8MBくらいになった。

自分の構成では毎回のCronのたびにImageをPullするので、通信費に結構な差が生じるし、デプロイ速度にもかかわる

cronjob側のパスを修正

DockerのENTRYPOINTを変えたので、cronjobのcommandは以下になる

          containers:
          - name: gke-test-container
            image: us.gcr.io/gke-test-ludwig125-2/gke-test
            imagePullPolicy: Always
            command: ["/go/bin/gke-test"]

go.mod対応のDockerfileを使って改めてcirclecleでデプロイ

これでもう一度build & deployのジョブを回すと無事成功 1分ごとに起動するcronからいっぱいslackに通知がくるようになった

image

通知がうっとうしいのでいったんcronjobを消す

$kubectl get cronjob
NAME       SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
gke-test   */1 * * * *   False     0        16s             6m40s
$kubectl delete cronjob gke-test

cronjob.batch "gke-test" deleted

circleci API

まずは試しにGKEclusterのlistを見るだけのcircleciジョブを作り、それをcurlで起動させてみる

以下を参考に、circleciのAPI用のTOKENを作成する

TOKENを作るのはここ

APIで実行するためのジョブを作成する

version: 2
jobs:
(中略)
  list_gke_cluster:
    working_directory: /app
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
    docker:
      - image: google/cloud-sdk:alpine
    steps:
      - checkout
      - setup_remote_docker
      - run:
          name: Set gcloud
          command: |
              echo $GCLOUD_SERVICE_KEY | base64 -d > ${HOME}/service_account.json
              gcloud auth activate-service-account --key-file ${HOME}/service_account.json
              gcloud config set project $PROJECT_NAME
      - run:
          name: List GKE Cluster
          command: gcloud container clusters list

端末からジョブを起動する

CIRCLE_API_USER_TOKEN=<取得したTOKEN>
curl -u ${CIRCLE_API_USER_TOKEN}: -d build_parameters[CIRCLE_JOB]=list_gke_cluster https://circleci.com/api/v1.1/project/github/ludwig125/gke-test/tree/master

上のcurlは以下と同じことをしている(どちらがしっくりくる書き方かは好みだと思う)

curl -XPOST https://circleci.com/api/v1.1/project/github/ludwig125/gke-test/tree/master --data "build_parameters[CIRCLE_JOB]=list_gke_cluster" --user "${CIRCLE_API_USER_TOKEN}:"

またはJSONであることを明記するなら以下のような書き方もできる

  • この書き方はこの後書くgoプログラムの内容に近いのでより理解がしやすい
curl -XPOST https://circleci.com/api/v1.1/project/github/ludwig125/gke-test/tree/master --user "${CIRCLE_API_USER_TOKEN}:" --header "Content-Type: application/json" -d '{
  "build_parameters": {
    "CIRCLE_JOB": "list_gke_cluster"
  }
}'

成功すると以下のようにGKEのclusterが取得できたことがわかる image

#!/bin/bash -eo pipefail
gcloud container clusters list
NAME                    LOCATION    MASTER_VERSION  MASTER_IP      MACHINE_TYPE  NODE_VERSION  NUM_NODES  STATUS
gke-test-small-cluster  us-west1-c  1.13.7-gke.8    35.199.173.53  g1-small      1.13.7-gke.8  2          RUNNING

circleci APIをgoプログラムの中で実行する

curlで動作確認できたので、次に同じことをgoでやってcircleciジョブを起動させてみる

circleciのEnvironment Variables に以下を追加する

  • CIRCLE_API_USER_TOKEN
    • 中身は先ほど作ったcircleci APIのTOKEN

検証用に以下のようなgoプログラムを作成する

circleci検証用main.go

package main

import (
    "bytes"
    "fmt"
    "log"
    "net/http"
    "os"
)

func main() {
    token := mustGetenv("CIRCLE_API_USER_TOKEN")
    defer func() {
        err := requestCircleci(token, "list_gke_cluster")
        if err != nil {
            log.Fatalf("failed to requestCircleci: %v", err)
        }
        log.Println("requestCircleci successfully")
    }()
    log.Println("do task")
}

func mustGetenv(k string) string {
    v := os.Getenv(k)
    if v == "" {
        log.Fatalf("%s environment variable not set.", k)
    }
    log.Printf("%s environment variable set.", k)
    return v
}

func requestCircleci(token, job string) error {
    // 参考
    // https://circleci.com/docs/ja/2.0/api-job-trigger/
    // https://circleci.com/docs/api/#trigger-a-new-job
    client := &http.Client{}
    circleciURL := "https://circleci.com/api/v1.1/project/github/ludwig125/gke-test/tree/master"
    j := fmt.Sprintf(`{"build_parameters": {"CIRCLE_JOB": "%s"}}`, job)
    req, err := http.NewRequest("POST", circleciURL, bytes.NewBuffer([]byte(j)))
    req.SetBasicAuth(token, "")
    req.Header.Set("Content-Type", "application/json")
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    // circleci APIを呼び出すと、201 Created が返ってくるのでチェック
    if resp.StatusCode != 201 {
        return fmt.Errorf("status code Error. %v", resp.StatusCode)
    }

    // レスポンス本文が見たい場合はここのコメントアウトを外す
    // body, err := ioutil.ReadAll(resp.Body)
    // fmt.Println(string(body))
    return nil
}

うまく実行されるとこのようになる

$CIRCLE_API_USER_TOKEN=<取得したTOKEN> go run main.go

2019/09/18 05:52:23 CIRCLE_API_USER_TOKEN environment variable set.
2019/09/18 05:52:24 requestCircleci successfully

circleciの結果 image

うまく目的のジョブが起動しないときは、responseの内容でbuild_parametersが空になっていないか確認してみる(自分はここで結構悩んだ)

失敗したときのResoponse内容

build_parameters" : {}`

成功するときのResoponse内容

  "build_parameters" : {
    "CIRCLE_JOB" : "list_gke_cluster"
  },

ちなみにgo のJSONの扱いの話になるけど、 上のプログラムは以下のようにjson.Marshalを使った方法でもいい。 今回は階層の深いJSONパラメータではないので、単純な文字列結合という手法を使った

// BuildParams is params for circleci API
type BuildParams struct {
    CircleciJobs CircleciJob `json:"build_parameters"`
}

// CircleciJob designate circleci job name
type CircleciJob struct {
    JobName string `json:"CIRCLE_JOB"`
}

func requestCircleci(token, job string) error {
    client := &http.Client{}

    params := BuildParams{
        CircleciJobs: CircleciJob{JobName: job},
    }
    jsonBytes, err := json.Marshal(params)
    if err != nil {
        return fmt.Errorf("failed to Marshal: %v", err)
    }
    circleciURL := "https://circleci.com/api/v1.1/project/github/ludwig125/gke-test/tree/master"
    req, err := http.NewRequest("POST", circleciURL, bytes.NewBuffer(jsonBytes))
    req.SetBasicAuth(token, "")
    req.Header.Set("Content-Type", "application/json")
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode != 201 {
        return fmt.Errorf("status code Error. %v", resp.StatusCode)
    }

    return nil
}

最終的に作ったもの

いろいろ工夫して最終的に作った構成がこれ

ファイルの構成

$tree -a
.
├── .circleci
│   └── config.yml
├── Dockerfile
├── circleci.go
├── go.mod
├── go.sum
├── k8s
│   ├── cronjob.yaml
│   └── kustomization.yaml
├── main.go
└── slack.go

.circleci/config.yml

version: 2
jobs:
  build:
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
      IMAGE_NAME: gke-test
    docker:
      - image: google/cloud-sdk
    working_directory: /go/src/github.com/ludwig125/gke-test
    steps:
      - checkout
      - setup_remote_docker:
          version: 18.06.0-ce
      - run:
          name: Setup CLOUD SDK
          command: |
            # base64 -i ignore non-alphabet characters
            echo $GCLOUD_SERVICE_KEY | base64 -di > ${HOME}/gcloud-service-key.json
            gcloud auth activate-service-account --key-file ${HOME}/gcloud-service-key.json
            gcloud --quiet auth configure-docker
      - run:
          name: Docker Build & Push
          command: |
            docker build -t us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_BUILD_NUM} .
            docker tag us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_BUILD_NUM} us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:latest
            if [ -n "${CIRCLE_TAG}" ]; then
              docker tag us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_BUILD_NUM} us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}:${CIRCLE_TAG}
            fi
            docker push us.gcr.io/${PROJECT_NAME}/${IMAGE_NAME}
  deploy:
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
      CLUSTER_NAME: gke-test-small-cluster
      COMPUTE_ZONE: us-west1-c
    docker:
      - image: google/cloud-sdk
    working_directory: /app
    steps:
      - checkout
      - setup_remote_docker:
          version: 18.06.0-ce
      - run:
          name: Setup CLOUD SDK
          command: |
            # base64 -i ignore non-alphabet characters
            echo $GCLOUD_SERVICE_KEY | base64 -di > ${HOME}/gcloud-service-key.json
            gcloud auth activate-service-account --key-file ${HOME}/gcloud-service-key.json
      - run:
          name: Setup GKE Cluster Infomation
          command: |
            gcloud config set project $PROJECT_NAME
            gcloud config set container/cluster $CLUSTER_NAME
            gcloud config set compute/zone ${COMPUTE_ZONE}
            gcloud container clusters get-credentials $CLUSTER_NAME
      - run:
          name: Create secret
          command: |
            echo -n ${SLACK_TOKEN} > ./k8s/slack_token.txt
            echo -n ${SLACK_CHANNEL} > ./k8s/slack_channel.txt
            echo -n ${CIRCLE_API_USER_TOKEN} > ./k8s/circleci_token.txt
      - run:
          name: Install kustomize
          # ref. https://github.com/kubernetes-sigs/kustomize/blob/master/docs/INSTALL.md#quickly-curl-the-latest-binary
          command: |
            opsys=linux
            curl -s https://api.github.com/repos/kubernetes-sigs/kustomize/releases |\
                grep browser_download |\
                grep $opsys |\
                cut -d '"' -f 4 |\
                grep /kustomize/v |\
                sort | tail -n 1 |\
                xargs curl -O -L
            tar xzf ./kustomize_v*_${opsys}_amd64.tar.gz
            ./kustomize version
      - deploy:
          name: Kustomize build and Apply
          command: |
            ./kustomize build ./k8s/ | /usr/bin/kubectl apply -f -
  list_gke_cluster:
    working_directory: /app
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
    docker:
      - image: google/cloud-sdk:alpine
    steps:
      - checkout
      - setup_remote_docker
      - run:
          name: Set gcloud
          command: |
              # cloud-sdk:alpineの場合はbase64 -iがない
              echo $GCLOUD_SERVICE_KEY | base64 -d > ${HOME}/service_account.json
              gcloud auth activate-service-account --key-file ${HOME}/service_account.json
              gcloud config set project $PROJECT_NAME
      - run:
          name: List GKE Cluster
          command: gcloud container clusters list
  delete_gke_cluster:
    working_directory: /app
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
      CLUSTER_NAME: gke-test-small-cluster
      COMPUTE_ZONE: us-west1-c
    docker:
      - image: google/cloud-sdk # kubectlを使うのでalpineではない
    steps:
      - checkout
      - setup_remote_docker
      - run:
          name: Set gcloud
          command: |
              echo $GCLOUD_SERVICE_KEY | base64 -di > ${HOME}/service_account.json
              gcloud auth activate-service-account --key-file ${HOME}/service_account.json
              gcloud config set project $PROJECT_NAME
              gcloud config set container/cluster $CLUSTER_NAME
              gcloud config set compute/zone ${COMPUTE_ZONE} # delete時にもzone or regionの指定が必要
              gcloud container clusters get-credentials $CLUSTER_NAME # kubectlを使うので必要
      - run:
          name: Delete GKE Cluster
          command: |
              # clusterが存在する場合のみdelete
              if [ `gcloud container clusters list | grep $CLUSTER_NAME | wc -l` == 1 ]; then
                # clusterが存在してもStop中のこともあるのでRUNNINGのときだけdelete
                if [ `gcloud container clusters describe $CLUSTER_NAME | grep 'RUNNING' | wc -l` -gt 0 ]; then
                  # deleteには時間がかかるので先にcronjobを消す
                  kubectl delete cronjob gke-test
                  # --quiet をつけないと削除するかどうかy/nの入力を求める表示が出る
                  gcloud --quiet container clusters delete $CLUSTER_NAME
                fi
              else
                echo "$CLUSTER_NAME cluster does not exist"
              fi
      - run:
          name: Check GKE Cluster
          command: gcloud container clusters list
  create_gke_cluster:
    working_directory: /app
    environment:
      PROJECT_NAME: gke-test-ludwig125-2
      CLUSTER_NAME: gke-test-small-cluster
      COMPUTE_ZONE: us-west1-c
      #MACHINE_TYPE: n1-standard-4
      MACHINE_TYPE: g1-small
    docker:
      - image: google/cloud-sdk:alpine
    steps:
      - checkout
      - setup_remote_docker
      - run:
          name: Set gcloud
          command: |
              # cloud-sdk:alpineの場合はbase64 -iがない
              echo $GCLOUD_SERVICE_KEY | base64 -d > ${HOME}/service_account.json
              gcloud auth activate-service-account --key-file ${HOME}/service_account.json
              gcloud config set project $PROJECT_NAME
              gcloud config set compute/zone ${COMPUTE_ZONE}
      - run:
          name: Create GKE Cluster
          no_output_timeout: 20m # これを防ぐ:Too long with no output (exceeded 10m0s)
          command: |
              if [ `gcloud container clusters list | grep $CLUSTER_NAME | wc -l` == 1 ]; then
                # clusterが存在する場合、ERRORになっていないか確認
                if [ `gcloud container clusters describe $CLUSTER_NAME | grep 'ERROR' | wc -l` != 0 ]; then
                  # うまく作れていないときは消してからもう一度作る
                  gcloud --quiet container clusters delete $CLUSTER_NAME
                  gcloud --quiet container clusters create $CLUSTER_NAME \
                  --machine-type=$MACHINE_TYPE --disk-size 10 --zone $COMPUTE_ZONE \
                  --num-nodes=2
                fi
              elif [ `gcloud container clusters list | grep $CLUSTER_NAME | wc -l` == 0 ]; then
                # clusterが存在しない場合はcreate
                # --quiet をつけないと作成するかどうかy/nの入力を求める表示が出る
                gcloud --quiet container clusters create $CLUSTER_NAME \
                --machine-type=$MACHINE_TYPE --disk-size 10 --zone $COMPUTE_ZONE \
                --num-nodes=2
              else
                echo "$CLUSTER_NAME cluster already exists"
              fi
      - run:
          name: Check GKE Cluster
          command: gcloud container clusters list

workflows:
  version: 2
  master-build:
    jobs:
      - build:
          filters:
            branches:
              only: master
      # buildのあとにdeployを実行したい場合は以下を有効にする
      # - deploy:
      #     requires:
      #       - build
  # cronで定期実行させたい場合は以下を有効にする
   create-deploy:
     triggers:
       - schedule:
           cron: "10 0-6 * * *" # 9-15 in JST
           filters:
             branches:
               only:
                 - master
     jobs:
       - create_gke_cluster
       - deploy:
           requires:
             - create_gke_cluster
  • GKEはg1-smallでは最低でもVM2台ないとリソースが足りずに起動ができなかったので、 --num-nodes=2 とした
  • 日本時間の9時から15時にかけて毎時10分に起動するようにした
    • 時間に特に意味はない。毎日複数回起動できるか確認したかった

Dockerfile

FROM golang:1.13-alpine as builder

RUN mkdir /gke-test
WORKDIR /gke-test

# Install git + SSL ca certificates.
# Git is required for fetching the dependencies.
# Ca-certificates is required to call HTTPS endpoints.
RUN apk add --update --no-cache ca-certificates

# COPY go.mod and go.sum files to the workspace
COPY go.mod .
COPY go.sum .

# Get dependancies - will also be cached if we won't change mod/sum
RUN go mod download
# COPY the source code as the last step
COPY . .

# Build the binary
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo -o /go/bin/gke-test

# Second step to build minimal image
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=builder /go/bin/gke-test /go/bin/gke-test
ENTRYPOINT ["/go/bin/gke-test"]

cronjob.yaml

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: gke-test
  labels:
    app: gke-test
spec:
  # cronJob 参考
  # https://en.wikipedia.org/wiki/Cron
  # https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/
  # https://kubernetes.io/ja/docs/concepts/workloads/controllers/cron-jobs/
  schedule: "*/5 * * * *" # 5分おきに起動。頻繁すぎるとcircleciのジョブが間に合わずに複数回実行されてしまう
  concurrencyPolicy: Forbid # 前のCronが動いていたら動作しない
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: gke-test
        spec:
          containers:
          - name: gke-test-container
            image: us.gcr.io/gke-test-ludwig125-2/gke-test
            imagePullPolicy: Always
            command: ["/go/bin/gke-test"]
            resources:
              requests:
                memory: 512Mi
            env:
            - name: SLACK_TOKEN
              valueFrom:
                secretKeyRef:
                  name: slack-info
                  key: token
            - name: SLACK_CHANNEL
              valueFrom:
                secretKeyRef:
                  name: slack-info
                  key: channel
            - name: CIRCLE_API_USER_TOKEN
              valueFrom:
                secretKeyRef:
                  name: circleci-info
                  key: token
          restartPolicy: Never # Cron失敗時にコンテナを再起動しない

kustomization.yaml

kind: Kustomization
apiVersion: kustomize.config.k8s.io/v1beta1
commonLabels:
  app: gke-test

resources:
- cronjob.yaml

generatorOptions:
  disableNameSuffixhash: true
secretGenerator:
- name: slack-info
  files:
  - token=slack_token.txt
  - channel=slack_channel.txt
- name: circleci-info
  files:
  - token=circleci_token.txt

main.go

package main

import (
    "fmt"
    "log"
    "os"
    "runtime"
    "time"
)

func main() {
    start := time.Now()
    ciToken := mustGetenv("CIRCLE_API_USER_TOKEN")
    defer func() {
        err := requestCircleci(ciToken, "delete_gke_cluster")
        if err != nil {
            log.Fatalf("failed to requestCircleci: %v", err)
        }
        log.Println("requestCircleci successfully")
    }()
    slToken := mustGetenv("SLACK_TOKEN")
    channel := mustGetenv("SLACK_CHANNEL")

    // cronで処理をさせたい内容
    // ここでは結果をslackメッセージに渡している
    res := fmt.Sprintf("cpu: %d\n", runtime.NumCPU())
    res += fmt.Sprintf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))

    // 処理が終了したら成功メッセージをslackに通知する
    if err := sendSlackMsg(slToken, channel, res, start); err != nil {
        log.Println(err)
    }
}

func mustGetenv(k string) string {
    v := os.Getenv(k)
    if v == "" {
        log.Fatalf("%s environment variable not set.", k)
    }
    log.Printf("%s environment variable set.", k)
    return v
}

circleci.go

package main

import (
    "bytes"
    "fmt"
    "net/http"
)

func requestCircleci(token, job string) error {
    // 参考
    // https://circleci.com/docs/ja/2.0/api-job-trigger/
    // https://circleci.com/docs/api/#trigger-a-new-job
    client := &http.Client{}
    circleciURL := "https://circleci.com/api/v1.1/project/github/ludwig125/gke-test/tree/master"
    j := fmt.Sprintf(`{"build_parameters": {"CIRCLE_JOB": "%s"}}`, job)
    req, err := http.NewRequest("POST", circleciURL, bytes.NewBuffer([]byte(j)))
    req.SetBasicAuth(token, "")
    req.Header.Set("Content-Type", "application/json")
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    // circleci APIを呼び出すと、201 Created が返ってくるのでチェック
    if resp.StatusCode != 201 {
        return fmt.Errorf("status code Error. %v", resp.StatusCode)
    }

    // レスポンス本文が見たい場合はここのコメントアウトを外す
    // body, err := ioutil.ReadAll(resp.Body)
    // fmt.Println(string(body))
    return nil
}

slack.go

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nlopes/slack"
)

func sendSlackMsg(token, channel, result string, start time.Time) error {
    api := slack.New(token)
    channelID, timestamp, err := api.PostMessage(channel, slack.MsgOptionText(createSlackMsg(start, result), false), slack.MsgOptionUsername("gke-test-Bot"), slack.MsgOptionIconEmoji(":sunny:"))
    if err != nil {
        return fmt.Errorf("failed to send message: %s", err)
    }
    log.Printf("Message successfully sent to channel %s at %s", channelID, timestamp)
    return nil
}

func createSlackMsg(start time.Time, res string) string {
    jst := time.FixedZone("Asia/Tokyo", 9*60*60)
    finish := time.Now()
    processingTime := time.Since(start).Truncate(time.Second)

    msg := "*gke-test が正常に終了しました。*\n"
    msg += fmt.Sprintf("起動時刻: %v\n", start.In(jst).Format("2006-01-02 15:04:05"))
    msg += fmt.Sprintf("終了時刻: %v\n", finish.In(jst).Format("2006-01-02 15:04:05"))
    msg += fmt.Sprintf("所要時間: %v\n\n", processingTime)
    msg += fmt.Sprintf("%s\n", res)
    return msg
}

起動例

Slackに送られた通知を見るとこんな感じ

image

すべての料金  |  Compute Engine ドキュメント  |  Google Cloud のスペックを参考に、MACHINE_TYPE: n1-standard-4 にすれば以下のようになった。 ちゃんとCPU4になっている

image

毎日の起動でかかる金額

一日あたり、7回起動しても4円という素晴らしい金額になった image

左の方の山は、試行錯誤していた際に、GKEクラスタを丸一日起動させっぱなしにしていた時の料金で、このときは最高81円だった image

それが4円!

一日当たりにかかる料金の内訳

レポートの期間を1日にして、グループ条件をSKUにすると使っているリソースの内訳がわかりやすい

image

上のレポートで課金の発生している部分を以下の表にまとめた。

SKU プロダクト SKU ID 使用
Small Instance with 1 VCPU running in Americas Compute Engine 82AF-89FC-240D 1.55 hour
Multi-Regional Storage Asia Cloud Storage E653-0A40-3B69 0.02 gibibyte month
Multi-Regional Storage US Cloud Storage 0D5D-6E23-4250 0.01 gibibyte month

Small Instance with 1 VCPU running in Americas

VMの料金を表す。1.55 hour使用したことになっている。

毎回クラスタを作る際に --num-nodes=2 を設定してg1-smallのマシンを2台作っているので、1.55時間というのは2台のVMの稼働時間を合わせたものとなっている。 つまり、GKEクラスタが存在する時間は1日あたり 1.55*60/2=46.5分間 ということになる

毎日7回Cronを実行していることを考えると、 GKEクラスタが生成されて削除されるまでの平均の時間 は、46.5/7=6.64分間 ということになる。

この6.64分間というのは、GKEクラスタが生まれてcronjobがそれを消すまでの時間と大体合っていそうだ。 (大体毎時10分から作り始めて15分ごろにcronが起動しているので)

すべての料金  |  Compute Engine ドキュメント  |  Google Cloud を見ると、 g1-smallの1時間あたりの料金は0.03ドルなので、1ドル107円とすると、 1.55hour*0.03ドル*107=4.9755円となる。

マシンタイプ 仮想 CPU 数 メモリ 料金(米ドル) プリエンプティブル料金(米ドル)
g1-small 1 1.70GB $0.03 $0.01

課金額は4円となっているので、ちょっと計算が合わないけど、やすいからいいや。

もしこれが24時間フル稼働しているサーバだとしたら、 24/1.55 * 4 = 62円 くらいかかっていたはずだ。 (実際には一ヶ月フルに使うと継続利用料金が適用されて1日あたりはもう少しやすくなるはずだけど)それが4円で済んだ計算になる。

Multi-Regional Storage Asia

たぶんこれが、Slackやcircleciの通信費

余談だけど、最初に作った時はDockerImageをgolang alpineにして、かつGCRをうっかりasia.gcr.ioにしてしまった。

こうしてしまったことで、デプロイするたびにアジアのGCRから北アメリカのGKEクラスタにDocker ImageをPullする必要が生じていた。 また、このときはDocker Imageがscratchベースではなくgolang-alpineベースだったためImageが大きかったこともあり、通信費が一日12円もかかっていた。 ちょっと工夫するだけで、これを0円に抑えることができた。

参考:イメージの push と pull  |  Container Registry  |  Google Cloud

us.gcr.io は米国でイメージをホストしますが、その場所は、gcr.io によってホストされるイメージからは独立したストレージ バケットです。
eu.gcr.io は、欧州連合でイメージをホストします。
asia.gcr.io は、アジアでイメージをホストします

Multi-Regional Storage US

たぶんこれがGCRとGKEクラスタの通信費

まとめ

circleciのスケジュールとAPIを使って、GKEを格安で使う方法が確立できた

この構成を応用して安くGKEを活用できそう。

Go言語で作ったプログラムをKubernetes のコンテナにおいて実行する(デバッグ用)

概要

Kubernetes(またはDocker)のコンテナ内でGoのプログラムをデバッグする時に使う方法の自分用のメモ

活用ケース

Kubernetesにデプロイしたけど、Goのプログラムが予想通り動いていない! というときに、またDocker ImageのbuildしてからKubernetes deployという手順を踏むのは時間がかかっていやだった。

直接Kubernetesのコンテナに入って、プログラムを修正させつつ動作確認できたら楽。

方法1.コンテナにGoの実行環境がある場合

Docker のbase Imageがgolangだったりgolang-alpineだったら直接~.goプログラムを実行できる

例えば、開発環境で作ったmain.goをコンテナに持って行って実行したいのであれば以下でいい (alpineを想定しているので/bin/ashを指定している。通常のLinuxなら/bin/bashとか)

$kubectl cp main.go <pod名>:/

$kubectl exec -it <pod名>  /bin/ash

/go# go run main.go

方法2.コンテナにGoの実行環境がない場合

コンテナにGo言語の環境がないとき、コンテナ内でGo言語の開発環境を整えるのは面倒なので、Linuxで作ったバイナリをコンテナに送って実行してみるとデバッグが楽だった

$GOOS=linux GOARCH=amd64 go build -o hoge

$kubectl cp ./hoge <pod名>:/

$kubectl exec -it <pod名>  /bin/ash

/go# ./hoge

おまけ

自分はいちいちpod名を調べるのが面倒なので、以下のように指定している。 こうすればPod名が変わっても、Ctrl+Rの履歴からいつも同じコマンドを実行するだけでできるので楽

$kubectl cp main.go `kubectl get pods | awk '{print $1}' | tail -1`:/
$kubectl exec -it `kubectl get pods | awk '{print $1}'`  /bin/ash

また、Pod内にコンテナが複数存在する場合は以下のようにコンテナ名を指定する必要がある

$kubectl cp ./sample/sample.go `kubectl get pods | awk '{print $1}' | tail -1`:/  --container <container名> 

おわり

go言語のBenchmarkTestメモ

概要

いろいろとベンチマークを取った時のメモ

参考:

【題材1】sliceのappend

参考: Bad Go: not sizing slices. Why is it better to set the capacity on… | by Phil Pearl | The Startup | Medium

以下(大体)処理速度が遅い順番に記載

  1. BenchmarkTest1: 単純なappend
  2. BenchmarkTest2: 事前に追加するデータの個数分長さを確保してからappend
  3. BenchmarkTest3: 事前に追加するデータの個数分容量を確保してからappend
  4. BenchmarkTest4: 事前に追加するデータの個数分長さを確保してから要素を指定して代入
package main

import (
    "fmt"
    "testing"
)

var N = 1000000

func BenchmarkTest1(b *testing.B) {
    b.ResetTimer()
    list := []string{}
    for i := 0; i < N; i++ {
        list = append(list, fmt.Sprintf("%d", i))
    }
}

func BenchmarkTest2(b *testing.B) {
    b.ResetTimer()
    list := make([]string, N)
    for i := 0; i < N; i++ {
        list = append(list, fmt.Sprintf("%d", i))
    }
}

func BenchmarkTest3(b *testing.B) {
    b.ResetTimer()
    list := make([]string, 0, N)
    for i := 0; i < N; i++ {
        list = append(list, fmt.Sprintf("%d", i))
    }
}

func BenchmarkTest4(b *testing.B) {
    b.ResetTimer()
    list := make([]string, N)
    for i := 0; i < N; i++ {
        list[i] = fmt.Sprintf("%d", i)
    }
}

結果

Ubuntuでやったとき

  • 結果の見方
左から順に、
ループが実行された回数
1ループごとの所要時間
1ループごとのアロケーションされたバイト数
1ループごとのアロケーション回数
$go test -bench . -benchmem -count=4
goos: linux
goarch: amd64

BenchmarkTest1  1000000000               0.269 ns/op           0 B/op          0 allocs/op
BenchmarkTest1  1000000000               0.277 ns/op           0 B/op          0 allocs/op
BenchmarkTest1  1000000000               0.294 ns/op           0 B/op          0 allocs/op
BenchmarkTest1  1000000000               0.297 ns/op           0 B/op          0 allocs/op
BenchmarkTest2  1000000000               0.240 ns/op           0 B/op          0 allocs/op
BenchmarkTest2  1000000000               0.272 ns/op           0 B/op          0 allocs/op
BenchmarkTest2  1000000000               0.251 ns/op           0 B/op          0 allocs/op
BenchmarkTest2  1000000000               0.266 ns/op           0 B/op          0 allocs/op
BenchmarkTest3  1000000000               0.129 ns/op           0 B/op          0 allocs/op
BenchmarkTest3  1000000000               0.101 ns/op           0 B/op          0 allocs/op
BenchmarkTest3  1000000000               0.101 ns/op           0 B/op          0 allocs/op
BenchmarkTest3  1000000000               0.0998 ns/op          0 B/op          0 allocs/op
BenchmarkTest4  1000000000               0.0965 ns/op          0 B/op          0 allocs/op
BenchmarkTest4  1000000000               0.103 ns/op           0 B/op          0 allocs/op
BenchmarkTest4  1000000000               0.102 ns/op           0 B/op          0 allocs/op
BenchmarkTest4  1000000000               0.101 ns/op           0 B/op          0 allocs/op
PASS
ok           43.708s

Macでやったとき

----result----
$go test -bench . -benchmem -count=4
goos: darwin
goarch: amd64

BenchmarkTest1-8        1000000000               0.159 ns/op           0 B/op          0 allocs/op
BenchmarkTest1-8        1000000000               0.174 ns/op           0 B/op          0 allocs/op
BenchmarkTest1-8        1000000000               0.174 ns/op           0 B/op          0 allocs/op
BenchmarkTest1-8        1000000000               0.178 ns/op           0 B/op          0 allocs/op
BenchmarkTest2-8        1000000000               0.195 ns/op           0 B/op          0 allocs/op
BenchmarkTest2-8        1000000000               0.197 ns/op           0 B/op          0 allocs/op
BenchmarkTest2-8        1000000000               0.198 ns/op           0 B/op          0 allocs/op
BenchmarkTest2-8        1000000000               0.195 ns/op           0 B/op          0 allocs/op
BenchmarkTest3-8        1000000000               0.108 ns/op           0 B/op          0 allocs/op
BenchmarkTest3-8        1000000000               0.102 ns/op           0 B/op          0 allocs/op
BenchmarkTest3-8        1000000000               0.108 ns/op           0 B/op          0 allocs/op
BenchmarkTest3-8        1000000000               0.102 ns/op           0 B/op          0 allocs/op
BenchmarkTest4-8        1000000000               0.104 ns/op           0 B/op          0 allocs/op
BenchmarkTest4-8        1000000000               0.105 ns/op           0 B/op          0 allocs/op
BenchmarkTest4-8        1000000000               0.104 ns/op           0 B/op          0 allocs/op
BenchmarkTest4-8        1000000000               0.102 ns/op           0 B/op          0 allocs/op
PASS

【題材2】文字列の二次元Sliceをinterfaceの二次元Sliceに変換する

文字列の二次元Sliceをinterfaceの二次元Sliceにする

やっていることは上の話と同じ部分が多い

性能比較用の関数定義

以下のような関数を用意して性能を比較する

  • convertStringSlicesToInterfaceSlices1
    • なんの工夫もないappend
  • convertStringSlicesToInterfaceSlices2
    • 事前にmake(interface{}, 0, len(ss))で格納するスライスの長さ分容量を確保してからappend
  • convertStringSlicesToInterfaceSlices3
    • 事前にmake(interface{}, len(ss))で格納するスライスの長さ分のスライスを用意して、要素を指定して格納
  • convertStringSlicesToInterfaceSlices4
    • string から interfaceのの変換部分をreflectで行う関数interfaceSliceを使うように、convertStringSlicesToInterfaceSlices3を一部書き換え
    • reflectは遅いと聞いているので本当か確認する
    • 参考:go - Type converting slices of interfaces - Stack Overflow
func convertStringSlicesToInterfaceSlices1(sss [][]string) [][]interface{} {
    var iss [][]interface{}
    for _, ss := range sss {
        var is []interface{}
        for _, s := range ss {
            is = append(is, s)
        }
        iss = append(iss, is)
    }
    return iss
}

func convertStringSlicesToInterfaceSlices2(sss [][]string) [][]interface{} {
    iss := make([][]interface{}, 0, len(sss))
    for _, ss := range sss {
        is := make([]interface{}, 0, len(ss))
        for _, s := range ss {
            is = append(is, s)
        }
        iss = append(iss, is)
    }
    return iss
}

func convertStringSlicesToInterfaceSlices3(sss [][]string) [][]interface{} {
    iss := make([][]interface{}, len(sss))
    for i, ss := range sss {
        is := make([]interface{}, len(ss))
        for j, s := range ss {
            is[j] = s
        }
        iss[i] = is
    }
    return iss
}

func convertStringSlicesToInterfaceSlices4(sss [][]string) [][]interface{} {
    iss := make([][]interface{}, len(sss))
    for i, ss := range sss {
        iss[i] = interfaceSlice(ss)
    }
    return iss
}

func interfaceSlice(slice interface{}) []interface{} {
    s := reflect.ValueOf(slice)
    if s.Kind() != reflect.Slice {
        panic("interfaceSlice() given a non-slice type")
    }

    ret := make([]interface{}, s.Len())
    for i := 0; i < s.Len(); i++ {
        ret[i] = s.Index(i).Interface()
    }
    return ret
}

関数のテスト

念のため、全部の関数の機能がすべて想定通りかテストをしておく

以下では、makeStringSlicesで試験データを用意して、それを用意した関数に与えて、makeInterfaceSlicesと同じかどうかを比較する

  • ちなみにSssはStringSlices、IssはInterfaceSlicesの略
func makeStringSlices(n, m int) [][]string {
    var ress [][]string
    for i := 0; i < n; i++ {
        var res []string
        for j := 0; j < m; j++ {
            res = append(res, fmt.Sprintf("%d", j))
        }
        ress = append(ress, res)
    }
    return ress
}

func makeInterfaceSlices(n, m int) [][]interface{} {
    var ress [][]interface{}
    for i := 0; i < n; i++ {
        var res []interface{}
        for j := 0; j < m; j++ {
            res = append(res, fmt.Sprintf("%d", j))
        }
        ress = append(ress, res)
    }
    return ress
}

func TestConvertStringSlicesToInterfaceSlices(t *testing.T) {
    inputSss := makeStringSlices(3, 3)
    wantIss := makeInterfaceSlices(3, 3)
    t.Log("inputSss:", inputSss)
    t.Log("wantIss:", wantIss)
    if !reflect.DeepEqual(convertStringSlicesToInterfaceSlices1(inputSss), wantIss) {
        t.Error("failed to convertStringSlicesToInterfaceSlices1")
    }
    if !reflect.DeepEqual(convertStringSlicesToInterfaceSlices2(inputSss), wantIss) {
        t.Error("failed to convertStringSlicesToInterfaceSlices2")
    }
    if !reflect.DeepEqual(convertStringSlicesToInterfaceSlices3(inputSss), wantIss) {
        t.Error("failed to convertStringSlicesToInterfaceSlices3")
    }
    if !reflect.DeepEqual(convertStringSlicesToInterfaceSlices4(inputSss), wantIss) {
        t.Error("failed to convertStringSlicesToInterfaceSlices4")
    }
}

test実行結果 - inputSssとwantIssが同じことが確認できた

$go test -v sample/sample_test.go --run TestConvertStringSlicesToInterfaceSlices              
=== RUN   TestConvertStringSlicesToInterfaceSlices
--- PASS: TestConvertStringSlicesToInterfaceSlices (0.00s)
    sample_test.go:93: inputSss: [[0 1 2] [0 1 2] [0 1 2]]
    sample_test.go:94: wantIss: [[0 1 2] [0 1 2] [0 1 2]]
PASS
ok      command-line-arguments  0.001s

Benchmarkを計測

各関数の働きがすべて同じだということがわかったので、性能を確認する

以下で、var result interface{}を定義して、result = resと代入しているのには理由があって、for文でループをいっぱい回しても、forの外に影響を及ぼさないとコンパイラがfor文の中身を無視してしまってBenchmarkが想定通りに測れないという話を聞いたのでそれを参考にしている

参考:How to write benchmarks in Go | Dave Cheney

var N = 10000
var result [][]interface{}

func BenchmarkTestConvertSlices1(b *testing.B) {
    inputSss := makeStringSlices(10, 10)
    b.ResetTimer()
    var res [][]interface{}
    for i := 0; i < N; i++ {
        res = convertStringSlicesToInterfaceSlices1(inputSss)
    }
    result = res
}

func BenchmarkTestConvertSlices2(b *testing.B) {
    inputSss := makeStringSlices(10, 10)
    b.ResetTimer()
    var res [][]interface{}
    for i := 0; i < N; i++ {
        res = convertStringSlicesToInterfaceSlices2(inputSss)
    }
    result = res
}

func BenchmarkTestConvertSlices3(b *testing.B) {
    inputSss := makeStringSlices(10, 10)
    b.ResetTimer()
    var res [][]interface{}
    for i := 0; i < N; i++ {
        res = convertStringSlicesToInterfaceSlices3(inputSss)
    }
    result = res
}

func BenchmarkTestConvertSlices4(b *testing.B) {
    inputSss := makeStringSlices(10, 10)
    b.ResetTimer()
    var res [][]interface{}
    for i := 0; i < N; i++ {
        res = convertStringSlicesToInterfaceSlices4(inputSss)
    }
    result = res
}

count=3で各Benchmarkを3回ずつ実行した結果は以下の通り

[~/go/src/github.com/ludwig125_gosample] $go test -benchmem -run=^$ github.com/ludwig125_gosample/convertSlice -bench . --count=3
goos: linux
goarch: amd64
pkg: github.com/ludwig125_gosample/convertSlice
BenchmarkTestConvertSlices1     1000000000               0.0644 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices1     1000000000               0.0650 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices1     1000000000               0.0720 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices2     1000000000               0.0385 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices2     1000000000               0.0363 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices2     1000000000               0.0358 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices3     1000000000               0.0342 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices3     1000000000               0.0349 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices3     1000000000               0.0435 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices4     1000000000               0.0629 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices4     1000000000               0.0662 ns/op          0 B/op          0 allocs/op
BenchmarkTestConvertSlices4     1000000000               0.0655 ns/op          0 B/op          0 allocs/op
PASS
ok      github.com/ludwig125_gosample/convertSlice      5.747s
[~/go/src/github.com/ludwig125_gosample] $

結果

  • sliceの容量を確保してからappendするBenchmarkTestConvertSlices2か、長さを確保してから要素に代入するBenchmarkTestConvertSlices3が速いことが確認できた
  • reflectは遅いので使わないほうがいいと再認識した

  • 実験につかった関数名が長くて後悔した

【題材3】スライスの前方に追加する(Prepend)

  • スライスの後ろに要素を追加するAppendと異なり、スライスの前方に要素を追加するPrependはちょっと難しい

簡単な例はこれ

const size = 32

func prependSimple() {
    data := make([]int, 0, size)
    for i := 0; i < size; i++ {
        data = append([]int{i}, data...)
    }
    // data: [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
}

ただこれはBenchmarkを取ってみると毎回新しいスライスを作るため性能が悪いことがわかる

ちょっとわかりにくいけど、以下の方がはるかに速い

func prependWithCopy() {
    data := make([]int, 0, size)
    for i := 0; i < size; i++ {
        data = append(data, 0)
        copy(data[1:], data)
        data[0] = i
    }
    result = data
}

copy関数はbuildinのもので、copy(b, a) でスライスaをスライスbにコピーすることができる

Benchmark測定

注意点

The benchmark function must run the target code b.N times. During benchmark execution, b.N is adjusted until the benchmark function lasts long enough to be timed reliably.
package main

import (
    "fmt"
    "testing"
)

var result []int

const size = 32

func prependSimple() {
    data := make([]int, 0, size)
    for i := 0; i < size; i++ {
        data = append([]int{i}, data...)
    }
    result = data
}

func prependWithCopy() {
    data := make([]int, 0, size)
    for i := 0; i < size; i++ {
        data = append(data, 0)
        copy(data[1:], data)
        data[0] = i
    }
    result = data
}

func TestPrependSimple(t *testing.T) {
    prependSimple()
    fmt.Println("prependSimple:  ", result)
}

func TestPrependWithCopy(t *testing.T) {
    prependWithCopy()
    fmt.Println("prependWithCopy:", result)
}
func BenchmarkPrependSimple(b *testing.B) {
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        prependSimple()
    }
}

func BenchmarkPrependWithCopy(b *testing.B) {
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        prependWithCopy()
    }
}

実行結果

$go test -bench . -benchmem -count=4
prependSimple:   [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependWithCopy: [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependSimple:   [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependWithCopy: [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependSimple:   [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependWithCopy: [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependSimple:   [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
prependWithCopy: [31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0]
goos: linux
goarch: amd64
BenchmarkPrependSimple-8          453279              2575 ns/op            4848 B/op         64 allocs/op
BenchmarkPrependSimple-8          408256              2610 ns/op            4848 B/op         64 allocs/op
BenchmarkPrependSimple-8          454341              3956 ns/op            4848 B/op         64 allocs/op
BenchmarkPrependSimple-8          441591              2704 ns/op            4848 B/op         64 allocs/op
BenchmarkPrependWithCopy-8       3420060               331 ns/op             256 B/op          1 allocs/op
BenchmarkPrependWithCopy-8       3519646               332 ns/op             256 B/op          1 allocs/op
BenchmarkPrependWithCopy-8       3484824               332 ns/op             256 B/op          1 allocs/op
BenchmarkPrependWithCopy-8       3457867               333 ns/op             256 B/op          1 allocs/op
PASS

copyを使った関数の方がはるかに性能がいいことがわかる

  • 1ループごとの所要時間は 3000 ns/op -> 300 ns/opと約10倍の速さ
  • アロケーションされるバイト数も5000 B/op 弱-> 250 B/op くらいまで下がっている

go言語のpipeline、fan-in、fan-out

関連

ludwig125.hatenablog.com

pipeline(パイプライン)

Go言語による並行処理を読んでパイプラインやファンイン、ファンアウトについて自分なりに理解したので具体例とともに挙動を書く

O'Reilly Japan - Go言語による並行処理

pipelineや、fan-in(ファンイン)、fan-out(ファンアウト)についてその挙動を見るために、同じ題材に異なる3つ方法で対応してみる

題材

- 6つの作業(タスク)を処理する場合を考える
- 各作業は互いに依存がなく並行処理が可能
- タスク番号とそれに応じた必要作業時間が与えられる

対応方法

  1. goroutineを一切使わない方法で処理する
  2. pipelineを使う
  3. fan-inとfan-outを使った並行処理をする

以下ではTaskという構造体にTask番号とTaskにかかるコスト(作業時間)を定義して、複数Taskを処理するのにどれだけ時間がかかるか、どのように処理されていくかを見てみる

Task構造体とtaskList

type Task struct {
    Number int
    Cost   time.Duration
}
taskList = []Task{
    Task{1, 1 * time.Second},
    Task{2, 7 * time.Second},
    Task{3, 2 * time.Second},
    Task{4, 3 * time.Second},
    Task{5, 5 * time.Second},
    Task{6, 3 * time.Second},
}

■1. goroutineを一切使わない場合

まずはgoroutineを一切使わない場合から考える - 書くのも処理の結果も一番簡単で分かりやすい - 各TaskのListを順番に受け取って、TaskのCost分Sleepしている

package main

import (
    "log"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskを処理して処理済みのTask番号をSliceとして返す関数
func doTask(tasks []Task) []int {
    var doneTaskList []int
    for _, task := range tasks {
        log.Printf("do task number: %d\n", task.Number)
        // taskのための処理をする
        // ここではtask にかかるCostだけSleepする
        time.Sleep(task.Cost)
        doneTaskList = append(doneTaskList, task.Number) // 処理済みtask番号をlistにつめる
    }
    return doneTaskList
}

func main() {
    start := time.Now()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }

    count := 0
    for _, d := range doTask(taskList) { // 処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }
    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

これを実行すると以下の結果になる

$go run goroutine/fanin/fanin3/nogoroutine/main.go 
2019/09/30 05:03:25 do task number: 1
2019/09/30 05:03:26 do task number: 2
2019/09/30 05:03:33 do task number: 3
2019/09/30 05:03:35 do task number: 4
2019/09/30 05:03:38 do task number: 5
2019/09/30 05:03:43 do task number: 6
2019/09/30 05:03:46 done task number: 1
2019/09/30 05:03:46 done task number: 2
2019/09/30 05:03:46 done task number: 3
2019/09/30 05:03:46 done task number: 4
2019/09/30 05:03:46 done task number: 5
2019/09/30 05:03:46 done task number: 6
2019/09/30 05:03:46 Finished. Done 6 tasks. Total time: 21.004066s
  • すべてのTaskが終わってから、最後に処理されたTask番号を出力している様子がわかる
  • 処理時間の合計は、Costの合計とほぼ同じ

■2. pipelineを使う

上のコードをchannelを使った方法で書き直してみる。

pipelineにするだけではうまみが感じられないが、 これは、後に書くfan-inとfan-outのために重要な理解になる。

package main

import (
    "context"
    "log"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskをchannel化するgenerator
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
    taskCh := make(chan Task)

    go func() {
        defer close(taskCh)
        for _, task := range taskList {
            select {
            case <-ctx.Done():
                return
            case taskCh <- task: // taskをchannelにつめる
            }
        }
    }()
    return taskCh
}

// taskを処理して処理済みのTask番号をchannelとして返す関数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
    doneTaskCh := make(chan int)
    go func() {
        defer close(doneTaskCh)
        for task := range taskCh {
            select {
            case <-ctx.Done():
                return
            default:
            }
            log.Printf("do task number: %d\n", task.Number)
            // taskのための処理をする
            // ここではtask にかかるCostだけSleepする
            time.Sleep(task.Cost)
            doneTaskCh <- task.Number // 処理済みtask番号をchannelにつめる
        }
    }()
    return doneTaskCh
}

func main() {
    start := time.Now()
    // 処理の途中で中断されてもgoroutineリークしないようにcontextを使う
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }

    // taskChannelGerenatorとdoTaskという2つのステージをまとめたpipelineを定義する
    pipeline := doTask(ctx, taskChannelGerenator(ctx, taskList))
    count := 0
    for d := range pipeline { // pipelineから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }

    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

以下内容の説明

taskChannelGerenator
  • taskChannelGerenatorという関数を最初に呼び出して、これにtaskListを渡している
  • taskChannelGerenatorは、taskListを受け取って、これをTask型のchannelにする関数
  • pipelineの最初に変数をchannelに変換する処理が多く使われいて、一般にgenとかgeneratorなどと呼ばれることが多い
doTask
  • doTaskは、generatorの作ったTask channelを「for task := range taskCh」でそれぞれ処理する
  • goroutineの外部から終了を受け取って止まるようにするために、以下のselect文を挟んでいる
select {
case <-ctx.Done():
    return
default:
}
  • 最後に、「doneTaskCh <- task.Number」の部分で処理済みtask番号をchannelにつめる
main
  • 処理の途中で中断されてもgoroutineリークしないようにcontextを定義
  • 処理の最後に必ずすべて終わるようにdefer cancelをつける
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
  • taskChannelGerenatorとdoTaskという2つの関数をまとめたpipelineを定義する
  • generatorもdoTaskもchannelを受け取ってchannelを返す関数
  • このように入力と出力の型を同じにして、自由に組み合わせ可能な処理をそれぞれステージと呼び、ステージのまとまりをpipelineと呼ぶ
pipeline := doTask(ctx, taskChannelGerenator(ctx, taskList))
  • pipelineから処理済みtaskの番号を読み出し
for d := range pipeline {
略
}

実行結果

$go run goroutine/fanin/fanin3/before/main.go     
2019/09/30 05:15:52 do task number: 1
2019/09/30 05:15:53 do task number: 2
2019/09/30 05:15:53 done task number: 1
2019/09/30 05:16:00 do task number: 3
2019/09/30 05:16:00 done task number: 2
2019/09/30 05:16:02 do task number: 4
2019/09/30 05:16:02 done task number: 3
2019/09/30 05:16:05 do task number: 5
2019/09/30 05:16:05 done task number: 4
2019/09/30 05:16:10 do task number: 6
2019/09/30 05:16:10 done task number: 5
2019/09/30 05:16:13 done task number: 6
2019/09/30 05:16:13 Finished. Done 6 tasks. Total time: 21.006284s
  • doTaskの処理がすべて終わってからdoneが出力されているわけではないことがわかる
  • ただ、この時点ではpipeline化する前と処理の合計時間が変わらないのでうれしいことがあまりない

■3. fan-inとfan-outを使った並行処理をする

上のpipelineの速度を改善するために、doTaskの処理を並行で複数いっぺんに処理することを考える

1本のpipelineの流れを複数に分ける処理(同時に複数のgoroutineを起動する処理)をfan-out(ファンアウト)と呼び、 複数のgoroutineで処理されたchannelを1本に集約する処理をfan-in(ファンイン)と呼ぶ

package main

import (
    "context"
    "log"
    "sync"
    "time"
)

// Task はtask番号とtaskにかかるcost(作業時間)をまとめた構造体
type Task struct {
    Number int
    Cost   time.Duration
}

// taskをchannel化するgenerator
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
    taskCh := make(chan Task)

    go func() {
        defer close(taskCh)
        for _, task := range taskList {
            select {
            case <-ctx.Done():
                return
            case taskCh <- task:
            }
        }
    }()
    return taskCh
}

// taskを処理して処理済みのTask番号をchannelとして返す関数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
    doneTaskCh := make(chan int)
    go func() {
        defer close(doneTaskCh)
        for task := range taskCh {
            select {
            case <-ctx.Done():
                return
            default:
                log.Printf("do task number: %d\n", task.Number)
                // taskのための処理をする
                // ここではtask にかかるCostだけSleepする
                time.Sleep(task.Cost)
                doneTaskCh <- task.Number // 処理済みtask番号をchannelにつめる
            }
        }
    }()
    return doneTaskCh
}

func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
    var wg sync.WaitGroup
    mergedTaskCh := make(chan int)

    mergeTask := func(taskCh <-chan int) {
        defer wg.Done()
        for t := range taskCh {
            select {
            case <-ctx.Done():
                return
            case mergedTaskCh <- t:
            }
        }
    }

    wg.Add(len(taskChs))
    for _, taskCh := range taskChs {
        go mergeTask(taskCh)
    }
    // 全てのtaskが処理されるまで待つ
    go func() {
        wg.Wait()
        close(mergedTaskCh)
    }()
    return mergedTaskCh
}

func main() {
    start := time.Now()
    // 処理の途中で中断されてもgoroutineリークしないようにcontextを使う(done channelでもいい)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // taskListに各Task番号とそのCostを定義する
    taskList := []Task{
        Task{1, 1 * time.Second},
        Task{2, 7 * time.Second},
        Task{3, 2 * time.Second},
        Task{4, 3 * time.Second},
        Task{5, 5 * time.Second},
        Task{6, 3 * time.Second},
    }
    taskCh := taskChannelGerenator(ctx, taskList)

    numWorkers := 4
    workers := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = doTask(ctx, taskCh)
    }

    count := 0
    for d := range merge(ctx, workers) { // mergeから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }
    log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

以下内容の説明

main
  • 一旦taskChannelGerenatorで作ったchannelを受け取り、doTask(ctx, taskCh)を複数for文で実行する
  • 実行結果(intのsliceのchannel)はworkersに格納する
  • doTaskを複数のworkersに処理を分けているので、これがfan-out処理にあたる
taskCh := taskChannelGerenator(ctx, taskList)
numWorkers := 4
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
    workers[i] = doTask(ctx, taskCh)
}
merge
  • merge関数は引数としてworkersが渡した複数のchannel「[]<-chan int」を受け取って、単一のchannel「<-chan int」を返す関数
  • 複数channelを一つにmergeしているので、これがfan-inにあたる
func merge(ctx context.Context, taskChs []<-chan int) <-chan int 
  • 以下の部分では、taskChsの数だけchannelを受け取ってそれぞれのchannelをgoroutineとして起動したmergeTaskに渡す
    • ここでmergeTaskに渡しているchannelは複数のうち1つだけだということに注意
    • 複数channelを複数goroutineでそれぞれ処理するということ
  • 最後に複数goroutineの待ち合わせをする必要があるので、事前にtaskChsの数だけwg.Addしている
wg.Add(len(taskChs))
for _, taskCh := range taskChs {
    go mergeTask(taskCh)
}
  • mergeTaskでは処理が終わったら「defer wg.Done()」でwgの数を減らすようにする
  • for select文の中で、ctx.Done()を受け取る部分は処理の中断用で前述と同じ
  • 事前にmergedTaskChという単一のchannelを定義する
  • goroutineでそれぞれ処理されるmergeTask関数の中で、受け取ったchannelは、このmergedTaskChに詰められる
    • ここで複数のchannelが1つになっている
 mergedTaskCh := make(chan int)
    mergeTask := func(taskCh <-chan int) {
        defer wg.Done()
        for t := range taskCh {
            select {
            case <-ctx.Done():
                return
            case mergedTaskCh <- t:
            }
        }
    }
  • 最後に、wgを待ち合わせて、mergedTaskChをcloseして、mergedTaskChを返す
 // 全てのtaskが処理されるまで待つ
    go func() {
        wg.Wait()
        close(mergedTaskCh)
    }()
    return mergedTaskCh
}
再びmain関数
  • merge関数が返すchannelを受け取って、Task番号を出力する
 for d := range merge(ctx, workers) { // mergeから処理済みtaskの番号を読み出し
        count++
        log.Printf("done task number: %d\n", d)
    }

実行結果

$go run goroutine/fanin/fanin3/after/main.go 
2019/09/30 06:19:14 do task number: 1
2019/09/30 06:19:14 do task number: 4
2019/09/30 06:19:14 do task number: 2
2019/09/30 06:19:14 do task number: 3
2019/09/30 06:19:15 do task number: 5
2019/09/30 06:19:15 done task number: 1
2019/09/30 06:19:16 do task number: 6
2019/09/30 06:19:16 done task number: 3
2019/09/30 06:19:17 done task number: 4
2019/09/30 06:19:19 done task number: 6
2019/09/30 06:19:20 done task number: 5
2019/09/30 06:19:21 done task number: 2
2019/09/30 06:19:21 Finished. Done 6 tasks. Total time: 7.001934s
  • numWorkersを4にして実行すると上の結果が得られた
  • 同時並行で4つのpipelineが起動するので、これだけ早くなっている

numWorkersをいくつにするかは、使っているマシンのコア数などによって決まってくる

go言語で複数のgoroutineのエラーハンドリングをする

関連

ludwig125.hatenablog.com

複数のgoroutineの結果の取得

複数のgoroutineの結果の取得1(エラーが起きると中断する例)

第5章 並行プログラミング―ゴルーチンとチャネルを使いこなす:はじめてのGo―シンプルな言語仕様,型システム,並行処理|gihyo.jp … 技術評論社

こちらのコードを参考に以下のような複数のgoroutineを実行してその結果を取得する場合を考える

package main

import (
        "fmt"
        "log"
        "net/http"
)

func getStatus(urls []string) <-chan string {
        statusChan := make(chan string)
        for _, url := range urls {
                go func(url string) {
                        res, err := http.Get(url)
                        if err != nil {
                                log.Fatal(err)
                        }
                        statusChan <- res.Status
                }(url)
        }
        return statusChan
}

func main() {
        urls := []string{"https://www.google.com", "https://www.yahoo.co.jp/"}
        statusChan := getStatus(urls)

        for i := 0; i < len(urls); i++ {
                fmt.Println(<-statusChan)
        }
}

これを実行すると以下のような結果になる

$go run goroutine0.go 
200 OK
200 OK

しかし、このコードの場合、どれか一つでもhttp.Getに失敗するとそこで処理を中断してしまうようになっている

試しに、上のurlsを以下のように書き換えてみる

urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}

実行するとhttps://badhostの時点で終了する

$go run goroutine0.go
2019/05/12 07:14:01 Get https://badhost: dial tcp: lookup badhost: no such host
exit status 1

もしhttp.Getに失敗した場合に、そのままstatusChan <- res.Statusをしようとすると、 以下のようなnil pointer参照のpanicが起きてしまうので、このコードではhttp.Getが失敗した時点でlog.Fatalによってエラーメッセージとともに処理を中断することになっている

http.Get error: Get https://badhost: dial tcp: lookup badhost: no such host
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0x5ec38e]

エラーが起きても処理を中断してほしくない場合は以下のように書ける

複数のgoroutineの結果の取得2(エラー処理ができない例)

上のコードを元に、エラーが起きてもその他の処理を進めるようにしたのが以下になる

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    checkStatus := func(urls []string) <-chan *http.Response {
        resultChan := make(chan *http.Response)
        for _, url := range urls {
            go func(url string) {
                resp, err := http.Get(url)
                // http.Getに時間がかかった場合を模するためにsleep                                    
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                if err != nil {
                    fmt.Printf("http.Get error: %v\n", err)
                }
                resultChan <- resp
            }(url)
        }
        return resultChan
    }
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    resultChan := checkStatus(urls)

    for i := 0; i < len(urls); i++ {
        result := <-resultChan
        if result == nil {
            fmt.Printf("Response: nil\n")
            continue
        }
        fmt.Printf("Response: %s\n", result.Status)
    }
}
  • http.Getでerrorが生じてもnil pointerのpanicが起きないように、「resultChan <- resp」のように結果をまるごと入れている。Statusは関数の呼び出し側で見ることにする
  • http.Getで失敗すると当然respはnilなので、channelの呼び出しの際に、「if result == nil」という条件分岐をしている

これを実行すると以下のようになる

$go run goroutine2.go
sleep 2 sec
sleep 2 sec
sleep 2 sec
http.Get error: Get https://badhost: dial tcp: lookup badhost: no such host
Response: nil
Response: 200 OK
Response: 200 OK

これはなかなか良さそうだが、関数の呼び出し側でエラーハンドリングができないという問題がある。

ここでは、nilが返ってきたことしかわからない

できれば「Get https://badhost: dial tcp: lookup badhost: no such host」を関数の呼び出し側で出力できるようにしたい

複数のgoroutineの結果の取得3(呼び出し側でエラー処理ができるようにした例)

Go言語による並行処理 などを読むと以下のようにErrorとResponseをひとまとめにした構造体を返せばいいとある concurrency-in-go-src/fig-patterns-proper-err-handling.go at 4e55fd7f3f5b9c5efc45a841702393a1485ba206 · kat-co/concurrency-in-go-src · GitHub

これを参考に上のコードを以下のように直してみる

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(urls []string) <-chan Result {
        resultChan := make(chan Result)
        for _, url := range urls {
            go func(url string) {
                resp, err := http.Get(url)
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                resultChan <- Result{Error: err, Response: resp}                                                                   
            }(url)
        }   
        return resultChan
    }   
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    result := checkStatus(urls)
    
    for i := 0; i < len(urls); i++ {
        res := <-result
        if res.Error != nil {
            fmt.Printf("error: %v\n", res.Error)
            continue
        }
        fmt.Printf("Response: %v\n", res.Response.Status)
    }
}   

res.Errorを関数の呼び出し側で出力できた。 これができればエラーの時は~などの処理がやりやすくなりそう

複数のgoroutineの結果の取得4(呼び出し側でエラー処理ができるようにした例 ※for range使用)

上のコードでも問題ないと思うが、勉強のために別の書き方も書いてみたい

ここまではchannelの読み込みを以下のようにしていたが、

result := checkStatus(urls)                                                                                                  
for i := 0; i < len(urls); i++ {                                  
    res := <-result                                               
    ~
}

channelは以下のような受け取り方もできる

for result := range checkStatus(urls) {
  ~
}

ただ、for rangeの書き方をするには注意が必要になる

  • for rangeでchannelを受け取る場合、goroutine側でchennelのcloseが必要になる
    • closeしないと終わりがわからずmain側のfor range文が延々と待ち続けることになる
  • では単純にgoroutineの中で以下のように「defer close(results) 」してもいいのかというとそうではない
// うまくいかない例
func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(urls []string) <-chan Result {
        resultChan := make(chan Result, 10)                                                     
        
        for _, url := range urls {
            go func(url string) {
                defer close(resultChan) // ここでcloseすると複数goroutine全部を待てない
                resp, err := http.Get(url)
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                resultChan <- Result{Error: err, Response: resp}
            }(url)
        }   
        return resultChan
    }   
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    
    for result := range checkStatus(urls) {
        if result.Error != nil {
            fmt.Printf("error: %v\n", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}   

これを実行すると以下のようになる

sleep 2 sec
sleep 2 sec
sleep 2 sec
error: Get https://badhost: dial tcp: lookup badhost: no such host
  • goroutineが一つしか実行されていない

  • 複数のgoroutineの処理を待ってからclose(resultChan)をするために、sync.WaitGroupを使ってみた

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(urls []string) <-chan Result {
        resultChan := make(chan Result, 10)
        wg := new(sync.WaitGroup)

        defer close(resultChan)
        for _, url := range urls {
            wg.Add(1)
            go func(url string) {
                defer wg.Done()
                resp, err := http.Get(url)                                                                               
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                resultChan <- Result{Error: err, Response: resp}
            }(url)
        }   
        wg.Wait()
        return resultChan
    }   
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}
    
    for result := range checkStatus(urls) {
        if result.Error != nil {
            fmt.Printf("error: %v\n", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}   

実行結果は以下のようになる

sleep 2 sec
sleep 2 sec
sleep 2 sec
error: Get https://badhost: dial tcp: lookup badhost: no such host
Response: 200 OK
Response: 200 OK

やりたいことが実現できた

複数のgoroutineの結果の取得5(呼び出し側でエラー処理ができるようにした例 ※for range使用 goroutineリークを避けるように工夫したもの)

上のコードは、selectを使って外部からgoroutineを中断するように変えると、goroutineリークによってメモリ使用率を圧迫することを避けることができる

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

func main() {
    type Result struct {
        Error    error
        Response *http.Response
    }
    checkStatus := func(done <-chan interface{}, urls []string) <-chan Result {
        resultChan := make(chan Result, 10)
        wg := new(sync.WaitGroup)

        defer close(resultChan)
        for _, url := range urls {
            wg.Add(1)
            go func(url string) {
                defer wg.Done()
                resp, err := http.Get(url)
                fmt.Println("sleep 2 sec")
                time.Sleep(2 * time.Second)
                select {
                case <-done:
                    return
                case resultChan <- Result{Error: err, Response: resp}:
                }
            }(url)
        }
        wg.Wait()
        return resultChan
    }

    done := make(chan interface{})
    defer close(done)
    urls := []string{"https://www.google.com", "https://badhost", "https://www.yahoo.co.jp/"}

    for result := range checkStatus(done, urls) {
        if result.Error != nil {
            fmt.Printf("error: %v\n", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}

実行結果はこうなる

sleep 2 sec
sleep 2 sec
sleep 2 sec
error: Get https://badhost: dial tcp: lookup badhost: no such host
Response: 200 OK
Response: 200 OK

go言語で同時並列数を制御する

関連

ludwig125.hatenablog.com

同時並列数の制御

【同時並列数の制御】1. 並列数を制限しない場合

  • 並列数を制限しない場合はこの通り単純
  • 複数のgoroutineを起動する場合は、WaitGroupで待ち合わせをする
  • ※time.Sleep(1 * time.Second)は処理の様子をわかりやすくするため入れているだけで、実用では必要ない
package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    doTask()
    log.Println("finished")
}

func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    for _, num := range numbers {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }
    wg.Wait()
}

func fnA(n int) {
    log.Printf("do fnA. num: %d \n", n)
}

https://play.golang.org/p/JxdXBOThF0v

実行結果

2019/06/15 16:57:46 do fnA. num: 6 
2019/06/15 16:57:46 do fnA. num: 1 
2019/06/15 16:57:46 do fnA. num: 2 
2019/06/15 16:57:46 do fnA. num: 3 
2019/06/15 16:57:46 do fnA. num: 4 
2019/06/15 16:57:46 do fnA. num: 5 
2019/06/15 16:57:47 finished
  • 全てのgoroutineが同時に起動して、それぞれ1秒Sleepしたあとでfinishedが出力されている

【同時並列数の制御】2. 並列数を制限する場合

並列数を制限する場合 - 最大同時並列実行数をバッファサイズとしたチャネルを作り、そのチャネルの待ち合わせをすることで実現できる - semチャネルは、一旦concurrency数だけ受信したらバッファがいっぱいになるので、「<-sem」が呼ばれて解放されない限り、後続のgoroutineは起動しない => 最大同時並列実行数を制限できる

package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    doTask()
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency) // concurrency数のバッファ
    for _, num := range numbers {
        sem <- struct{}{}

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }() // 処理が終わったらチャネルを解放
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }
    wg.Wait()
}

func fnA(n int) {
    log.Printf("do fnA. num: %d \n", n)
}

https://play.golang.org/p/CEn0tw5SR-A

実行結果

2019/06/15 17:20:36 do fnA. num: 2 
2019/06/15 17:20:36 do fnA. num: 1 
2019/06/15 17:20:37 do fnA. num: 3 
2019/06/15 17:20:37 do fnA. num: 4 
2019/06/15 17:20:38 do fnA. num: 5 
2019/06/15 17:20:38 do fnA. num: 6 
2019/06/15 17:20:39 finished
  • concurrency数ずつ(ここでは2つずつ)1秒おきに実行されていることがわかる

参考

上のコードの「sem <- struct{}{}」の後ろでlen(sem)を出力してみると、一旦semチャネルのバッファがconcurrency数=2に達したら、あとは2を保ったまま後続のgoroutineが起動しているのがわかる

sem <- struct{}{}
fmt.Printf("len(sem): %d\n", len(sem)) // <- バッファ内の値を出力

実行結果

len(sem): 1
len(sem): 2
2019/06/15 20:54:28 do fnA. num: 2 
2019/06/15 20:54:28 do fnA. num: 1 
len(sem): 2
2019/06/15 20:54:29 do fnA. num: 3 
len(sem): 2
2019/06/15 20:54:29 do fnA. num: 4 
len(sem): 2
2019/06/15 20:54:30 do fnA. num: 5 
len(sem): 2
2019/06/15 20:54:30 do fnA. num: 6 
2019/06/15 20:54:31 finished

【同時並列数の制御】2-2. 並列数を制限する場合(チャネルを最後にcloseする)

上のをちょっと改良 - チャネルを使ったら最後にcloseしておいた方が安全なので、 - 以下のように全部のgoroutineを待って最後にチャネルをcloseするために、別のgoroutineを用意しておくと良い

  • goroutineが1つだけの場合は最初のgo func()内に、「defer close(チャネル)」を呼び出せばいいが、今回のように複数のgoroutineを待つ場合はこのように書くのが良さそう
func doTask() {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency)
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            fnA(n)

            // 処理をわかりやすくするため
            time.Sleep(1 * time.Second)
        }(num)
    }

    // 別のgoroutineで上の全部のgoroutineが終わるまで待つ
    // 終わったらチャネルをclose
    go func() {
        defer close(sem)
        wg.Wait()
    }()
}

https://play.golang.org/p/0MbVqYjU-B3

同時並列数の制御 その他のサンプル

同時並列数の制御として上とほとんど変わらないけど、後で書いたやつがあるのでサンプルとして載せておく 上ではwg.Wait()部分のみをgoroutineで抜き出していたが、これは関数の中身全体を一つのgoroutineで囲っている - こういう書き方もあるよって書いておきたかった - この書き方の問題は、全体をgo func()で囲っているために、関数の本体が長いと見づらくなるので自分はあまりしない

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

func worker(msg string) <-chan string {
    var wg sync.WaitGroup
    res := make(chan string)
    limit := make(chan int, 3)
    go func() {
        for i := 0; i <= 10; i++ {
            limit <- 1
            fmt.Println("len", len(limit))

            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                // 1秒かかる処理のつもり
                // 同時にlimitのバッファサイズ単位で処理していることがわかりやすいようにSleep
                time.Sleep(1 * time.Second)

                res <- fmt.Sprintf("%s done %d", msg, i)
                <-limit
            }(i)
        }
        wg.Wait()
        close(res)
    }()
    return res
}

func main() {
    res := worker("job")
    for v := range res {
        log.Println(v)
    }
}

実行結果

len 1
len 2
len 3
len 3
2019/08/08 06:47:18 job done 1
2019/08/08 06:47:18 job done 2
2019/08/08 06:47:18 job done 0
len 3
len 3
len 3
2019/08/08 06:47:19 job done 5
2019/08/08 06:47:19 job done 3
2019/08/08 06:47:19 job done 4
len 3
len 3
len 3
2019/08/08 06:47:20 job done 8
2019/08/08 06:47:20 job done 6
2019/08/08 06:47:20 job done 7
len 3
2019/08/08 06:47:21 job done 10
2019/08/08 06:47:21 job done 9

【同時並列数の制御】3. 並列数を制限してエラー処理もする場合

上のコードで、fnAがエラーを返す場合のエラー処理を入れる場合は以下になる

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    sem := make(chan struct{}, concurrency)
    errChan := make(chan error, len(numbers))
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            if err := fnA(n); err != nil {
                errChan <- fmt.Errorf("failed to A, %v", err)
                log.Printf("--> fnA len(errChan) %d", len(errChan))

                time.Sleep(1 * time.Second) // 処理をわかりやすくするため
                return
            }
            time.Sleep(1 * time.Second) // 処理をわかりやすくするため
        }(num)
    }

    go func() {
        defer close(sem)
        defer close(errChan)
        wg.Wait()
    }()

    for err := range errChan {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}
  • goroutine内で生じたエラーを外に伝えるために、errChanというチャネルを用意 errChan := make(chan error, len(numbers))
    • このチャネルのバッファ数が重要!!
  • fnAの実行時にエラーが発生した場合はerrChanに送信
  • errChanからエラーを読み取って、errを返す
for err := range errChan {
    return err
}
  • wg.Wait()のあとに close(errChan) もする

errChanのバッファ数を起動されるgoroutineの数(ここではnumbersの6)だけ用意することで、エラーが複数発生してもチャネルが詰まらないようにしているのがポイント

https://play.golang.org/p/KdQB7fLn9Na

実行結果

2019/06/16 08:46:51 do fnA.
2019/06/16 08:46:51 --> failed to do fnA. num: 2
2019/06/16 08:46:51 --> fnA len(errChan) 1
2019/06/16 08:46:51 do fnA.
2019/06/16 08:46:51 --> failed to do fnA. num: 1
2019/06/16 08:46:51 --> fnA len(errChan) 2
2019/06/16 08:46:52 do fnA.
2019/06/16 08:46:52 --> failed to do fnA. num: 3
2019/06/16 08:46:52 --> fnA len(errChan) 3
2019/06/16 08:46:52 do fnA.
2019/06/16 08:46:52 --> failed to do fnA. num: 4
2019/06/16 08:46:52 --> fnA len(errChan) 4
2019/06/16 08:46:53 do fnA.
2019/06/16 08:46:53 --> failed to do fnA. num: 5
2019/06/16 08:46:53 --> fnA len(errChan) 5
2019/06/16 08:46:53 error occured. failed to A, error A. num: 2
2019/06/16 08:46:53 finished
  • エラーが発生するたびにerrChanのバッファが埋まっていく様子がわかる
バッファ数が足りないとどうなるか?

試しに、errChanのバッファ数を0にすると、読み取り手がいないエラーを複数投げようとして詰まってdeadlockが発生する errChan := make(chan error)

https://play.golang.org/p/Zy7xu6k8U9Y

実行結果

2019/06/16 08:25:37 do fnA.
2019/06/16 08:25:37 --> failed to do fnA.
2019/06/16 08:25:37 do fnA.
2019/06/16 08:25:37 --> failed to do fnA.
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
以下省略

参考

上のコードが全部成功した場合

一応載せておくとこんな感じ

var errFlag bool = false にして実行する

https://play.golang.org/p/ZmRaPLpWC3T

実行結果

2019/06/16 08:18:39 do fnA.
2019/06/16 08:18:39 --> succeeded to do fnA. num: 2
2019/06/16 08:18:39 do fnA.
2019/06/16 08:18:39 --> succeeded to do fnA. num: 1
2019/06/16 08:18:40 do fnA.
2019/06/16 08:18:40 --> succeeded to do fnA. num: 3
2019/06/16 08:18:40 do fnA.
2019/06/16 08:18:40 --> succeeded to do fnA. num: 4
2019/06/16 08:18:41 do fnA.
2019/06/16 08:18:41 --> succeeded to do fnA. num: 5
2019/06/16 08:18:41 do fnA.
2019/06/16 08:18:41 --> succeeded to do fnA. num: 6
2019/06/16 08:18:42 finished

【同時並列数の制御】4. contextを使ってエラー制御をきちんとする

上のエラーが起きたときの挙動を見てみると、エラーが起きてもすぐに終了していないことがわかる

上のコードで、起動時のnumを出力させて見ると以下のようになる

 for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)  ← 出力

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()
            log.Printf("goroutine num: %d", num) ← 出力

https://play.golang.org/p/mlzfPOWDDWt

実行結果

2019/06/16 16:51:31 num: 1
2019/06/16 16:51:31 num: 2
2019/06/16 16:51:31 goroutine n: 2
2019/06/16 16:51:31 do fnA.
2019/06/16 16:51:31 --> failed to do fnA. num: 2
2019/06/16 16:51:31 --> fnA len(errChan) 1
2019/06/16 16:51:31 goroutine n: 1
2019/06/16 16:51:31 do fnA.
2019/06/16 16:51:31 --> failed to do fnA. num: 1
2019/06/16 16:51:31 --> fnA len(errChan) 2
2019/06/16 16:51:32 num: 3
2019/06/16 16:51:32 goroutine n: 3
2019/06/16 16:51:32 do fnA.
2019/06/16 16:51:32 --> failed to do fnA. num: 3
2019/06/16 16:51:32 --> fnA len(errChan) 3
2019/06/16 16:51:32 num: 4
2019/06/16 16:51:32 goroutine n: 4
2019/06/16 16:51:32 do fnA.
2019/06/16 16:51:32 --> failed to do fnA. num: 4
2019/06/16 16:51:32 --> fnA len(errChan) 4
2019/06/16 16:51:33 num: 5
2019/06/16 16:51:33 goroutine n: 5
2019/06/16 16:51:33 do fnA.
2019/06/16 16:51:33 --> failed to do fnA. num: 5
2019/06/16 16:51:33 --> fnA len(errChan) 5
2019/06/16 16:51:33 num: 6
2019/06/16 16:51:33 error occured. failed to A, error A. num: 2
2019/06/16 16:51:33 finished

これはリソースの無駄なので、エラーが起きたら即終了させるようにしたい

こういうときはcontextが便利

「【同時並列数の制御】3」のソースコードをcontextを使って以下のように書き直す

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background()) // contextとキャンセル関数を定義
    defer cancel() // doTask終了時に子プロセスを全て終了するようにしたい

    sem := make(chan struct{}, concurrency)
    errChan := make(chan error, len(numbers))
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)

        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            defer func() { <-sem }()

            log.Printf("goroutine num: %d", n)
            select {
            case <-ctx.Done(): // contextのcancelが呼び出されたらここに入って即終了する
                return
            default:
            }
            if err := fnA(n); err != nil {
                errChan <- fmt.Errorf("failed to A, %v", err)
                log.Printf("--> fnA len(errChan) %d", len(errChan))

                // エラーが発生したら他の処理はキャンセル
                cancel()
                time.Sleep(1 * time.Second) // 処理をわかりやすくするため
                return
            }
            time.Sleep(1 * time.Second) // 処理をわかりやすくするため
        }(num)
    }

    go func() {
        defer close(sem)
        defer close(errChan)
        wg.Wait()
    }()

    for err := range errChan {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}
  • contextのcancelが呼び出されたら「<-ctx.Done()」を受け取って即終了するようにする
select {
case <-ctx.Done():
    return
default:
}
  • エラーが発生したら他の処理はキャンセルするため cancel() を送る

https://play.golang.org/p/N1mjZlo51VV

実行結果

2019/06/16 16:54:11 num: 1
2019/06/16 16:54:11 num: 2
2019/06/16 16:54:11 goroutine num: 3
2019/06/16 16:54:11 do fnA.
2019/06/16 16:54:11 --> failed to do fnA. num: 2
2019/06/16 16:54:11 --> fnA len(errChan) 1
2019/06/16 16:54:11 goroutine num: 3
2019/06/16 16:54:11 num: 3
2019/06/16 16:54:11 goroutine num: 4
2019/06/16 16:54:11 num: 4
2019/06/16 16:54:11 goroutine num: 5
2019/06/16 16:54:11 num: 5
2019/06/16 16:54:11 goroutine num: 6
2019/06/16 16:54:11 num: 6
2019/06/16 16:54:11 error occured. failed to A, error A. num: 2
2019/06/16 16:54:11 finished
  • 「do fnA. 」は一度しか呼び出されていない
  • 一つエラーが発生したら、それ以外のgoroutineは起動してもすぐに処理が終わっていることがわかる

【同時並列数の制御】5. contextに加えてerrgroupを使ってエラー制御をかんたんにする

errgroupを使うことで、エラー制御が便利になる。

以下は、syncの代わりにerrgroupを使っている

  • go get golang.org/x/sync/errgroup でerrgroupを取得
  • errChanは使わないで済むようになった
  • 失敗した時の他の処理の取り消しはcancelを書かなくても勝手にやってくれる
package main

import (
    "context"
    "fmt"
    "log"

    "golang.org/x/sync/errgroup"
)

func main() {
    if err := doTask(); err != nil {
        log.Printf("error occured. %v", err)
    }
    log.Println("finished")
}

const concurrency = 2 // 最大同時並列実行数

var errFlag bool = true

func doTask() error {
    numbers := []int{1, 2, 3, 4, 5, 6}

    eg, ctx := errgroup.WithContext(context.Background())

    sem := make(chan struct{}, concurrency)
    defer close(sem)
    for _, num := range numbers {
        sem <- struct{}{} // チャネルに送信
        log.Printf("num: %d", num)

        n := num
        eg.Go(func() error {
            defer func() { <-sem }()
            log.Printf("goroutine num: %d", n)
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }
            if err := fnA(n); err != nil {
                return fmt.Errorf("failed to A, %v", err)
            }
            return nil
        })
    }

    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

func fnA(n int) error {
    log.Println("do fnA.")
    if errFlag {
        log.Printf("--> failed to do fnA. num: %d", n)
        return fmt.Errorf("error A. num: %d", n)
    }
    log.Printf("--> succeeded to do fnA. num: %d", n)
    return nil
}

https://play.golang.org/p/ycVHbLn45b6

実行結果

2019/06/16 17:17:31 num: 1
2019/06/16 17:17:31 num: 2
2019/06/16 17:17:31 goroutine num: 2
2019/06/16 17:17:31 do fnA.
2019/06/16 17:17:31 --> failed to do fnA. num: 2
2019/06/16 17:17:31 goroutine num: 1
2019/06/16 17:17:31 num: 3
2019/06/16 17:17:31 goroutine num: 3
2019/06/16 17:17:31 num: 4
2019/06/16 17:17:31 goroutine num: 4
2019/06/16 17:17:31 num: 5
2019/06/16 17:17:31 goroutine num: 5
2019/06/16 17:17:31 num: 6
2019/06/16 17:17:31 goroutine num: 6
2019/06/16 17:17:32 error occured. failed to A, error A. num: 2
2019/06/16 17:17:32 finished

参考:

go言語でシグナルをきちんとエラーハンドリングする

関連

並行処理全般に関するメモは以下 go言語の並行処理 - ludwig125のブログ

go言語でsignalを適切に処理する方法を調べたので例をいくつか

シグナルを受け付けて関数を適切に終了させる例1

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {

    sigs := make(chan os.Signal, 1)
    ctx, cancel := context.WithCancel(context.Background())

    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    defer func() {
        // シグナルの受付を終了する
        signal.Stop(sigs)
        cancel()
    }()

    go func() {
        select {
        case sig := <-sigs: // シグナルを受け取ったらここに入る
            fmt.Println("Got signal!", sig)
            cancel() // cancelを呼び出して全ての処理を終了させる
        }
    }()

    if err := doTask(ctx); err != nil {
        fmt.Printf("failed to doTask: %v", err)
        cancel() // 何らかのエラーが発生した場合、他の処理も全てcancelさせる
        return
    }
    fmt.Println("done successfully.")
}

func doTask(ctx context.Context) error {
    defer fmt.Println("done doTask")
    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            fmt.Println("received done")
            return ctx.Err()
        default:
        }
        // // エラー時の挙動が見たい場合はここのコメントアウトを外す
        // if i == 3 {
        //  return fmt.Errorf("error happened")
        // }

        // do something
        fmt.Println("sleep 1. count:", i)
        time.Sleep(1 * time.Second)
    }
    return nil
}

動作確認

順にそれぞれの挙動を見てみる

正常終了時

$go run signal3/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
sleep 1. count: 3
sleep 1. count: 4
done doTask
done successfully.

異常終了時(上のコメントアウトを外した時)

$go run signal3/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
done doTask
failed to doTask: error happened%                                                                                                               

Ctrl+Cをした時

$go run signal3/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
^CGot signal! interrupt
received done
done doTask
failed to doTask: context canceled%                                                                                                             
  • Ctrl+Cのとき、signalを受け取った後cancelされていることがわかる

シグナルを受け付けて関数を適切に終了させる例2(タスクの中身がgoroutine)

練習がてら、doTaskの中身をgoroutineにしてみた場合も考えてみた

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"

    "golang.org/x/net/context"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, os.Interrupt)
    defer func() {
        // シグナルの受付を終了する
        signal.Stop(sigs)
        cancel()
    }()
    go func() {
        select {
        case sig := <-sigs: // シグナルを受け取ったらここに入る
            fmt.Println("Got signal!", sig)
            cancel() // cancelを呼び出して全ての処理を終了させる
            return
        }
    }()

    res, err := doTask(ctx)
    for v := range res {
        fmt.Println("done successfully.", v)
    }
    for e := range err {
        fmt.Printf("failed to doTask: %v", e)
        cancel() // 何らかのエラーが発生した場合、他の処理も全てcancelさせる
        return
    }
}

func doTask(ctx context.Context) (<-chan string, <-chan error) {
    resCh := make(chan string)
    errCh := make(chan error, 5)
    go func() {
        defer fmt.Println("done doTask")
        defer close(resCh)
        defer close(errCh)
        for i := 0; i < 5; i++ {
            select {
            case <-ctx.Done():
                fmt.Println("received done")
                // Do something before terminated
                time.Sleep(500 * time.Millisecond)
                errCh <- ctx.Err()
                return
            default:
            }
            // // エラー時の挙動が見たい場合はここのコメントアウトを外す
            // if i == 3 {
            //  errCh <- fmt.Errorf("error happened")
            //  return
            // }

            // do something
            fmt.Println("sleep 1. count:", i)
            time.Sleep(time.Second)
        }
        resCh <- fmt.Sprintf("something")
    }()
    return resCh, errCh
}

動作確認

正常終了時

$go run signal5/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
sleep 1. count: 3
sleep 1. count: 4
done doTask
done successfully. something

異常終了時(上のコメントアウトを外した時)

$go run signal5/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
done doTask
failed to doTask: error happened%                                                                                                               

Ctrl+Cをした時

$go run signal5/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
^CGot signal! interrupt
received done
done doTask
failed to doTask: context canceled%                                                                                                             

問題なさそう

シグナルを受け付けて関数を適切に終了させる例3(独自のcontextを用意する)

こちらの記事で紹介されていたNewCtxを使ってみる Managing Groups of Goroutines in Go - The Startup - Medium

これは、signalを処理してcontextのcancelを送るctxを定義するもの

シグナルを受け付けるまで処理し続けるような場合(main側でエラー時にcancelを使う必要がない場合)では、上のコードがかなり減った

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "golang.org/x/net/context"
)

func main() {
    res, err := doTask(newCtx())
    for v := range res {
        fmt.Println("done successfully.", v)
    }
    for e := range err {
        fmt.Printf("failed to doTask: %v", e)
        return
    }
}

func newCtx() context.Context {
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        sCh := make(chan os.Signal, 1)
        signal.Notify(sCh, syscall.SIGINT, syscall.SIGTERM)
        <-sCh
        fmt.Println("Got signal!")
        cancel()
    }()
    return ctx
}

func doTask(ctx context.Context) (<-chan string, <-chan error) {
    resCh := make(chan string)
    errCh := make(chan error, 5)
    go func() {
        defer fmt.Println("done doTask")
        defer close(resCh)
        defer close(errCh)
        for i := 0; i < 5; i++ {
            select {
            case <-ctx.Done():
                fmt.Println("received done")
                // Do something before terminated
                time.Sleep(500 * time.Millisecond)
                errCh <- ctx.Err()
                return
            default:
            }

            // do something
            fmt.Println("sleep 1. count:", i)
            time.Sleep(time.Second)
        }
        resCh <- fmt.Sprintf("something")
    }()
    return resCh, errCh
}

実行結果

シグナル送った時

$go run signal_pattern/signal6/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
^CGot signal!
received done
done doTask
failed to doTask: context canceled

正常終了時

$go run signal_pattern/signal6/signal.go
sleep 1. count: 0
sleep 1. count: 1
sleep 1. count: 2
sleep 1. count: 3
sleep 1. count: 4
done doTask
done successfully. something