Goの並列化パターン - Context
課題
goroutine 開始後に処理を中断したいケースがある。全てのコールスタックに doneチャネルを連携すればこの実現はできるが、以下の様な操作は実現できない
- 一部のコールスタックを即時中断
- 一部のコールスタックにタイムアウトを設定
- 一部のコールスタックに限定した情報管理
解決
Go 標準の context.Context
を活用し中断処理を実現する。Context
が提供する機能は以下の通り
また、context.Context は設計を通じて以下の価値も提供している
スコープを分割した並列操作
- 同じContextを引き回せば同スコープ
- 別のContextを生成すれば別スコープ
Context更新による上位スタックへの影響排除 (親 Contextを元に子 Contextを生成した場合)
- 親ContextへのCancel操作は 子Contextに影響あり (Doneとなる)
- 子ContextへのCancel操作は 親Contextに影響なし (Doneとならない)
実践
以下は context.Contextを用いて 2階層分の関数を呼び出している
- 0階層
main
- 1階層
runWithCtx1
- 2階層
runWithCtx2
1階層目の runWithCtx1
は main
から2並列 (goroutine#0, goroutine#1) で実行されるが、goroutine#1 は実行直後に main
からキャンセルされている
2階層目の runWithCtx2
は 1秒間の仕事を5回繰り返す関数だが、この例では上位の runWithCtx1
が設定したタイムアウト 3秒
により終了を待たずにキャンセルされている
package main import ( "context" "fmt" "io" "os" "strconv" "sync" "time" ) func main() { fmt.Println("main Started") const goroutineNum = 2 var wg sync.WaitGroup wg.Add(goroutineNum) for i := 0; i < goroutineNum; i++ { ctx, cancel := context.WithCancel(context.Background()) go func(cxt context.Context, i int) { defer wg.Done() runWithCtx1(ctx, i) }(ctx, i) // goroutine #1 の場合のみ直後に cancel指定 if i == 1 { cancel() } } wg.Wait() fmt.Println("main Finished") } func runWithCtx1(ctx1 context.Context, goroutineNo int) { funcName := "runWithCtx1" println(os.Stdout, goroutineNo, funcName, "Started") work(100 * time.Millisecond) if done(ctx1) { println(os.Stdout, goroutineNo, funcName, "Cancelled !!") return } // タイムアウトを3秒後に設定 ctx2, _ := context.WithTimeout(ctx1, 3*time.Second) runWithCtx2(ctx2, goroutineNo) if err := ctx2.Err(); err != nil { println(os.Stdout, goroutineNo, funcName, "Failed because of:"+err.Error()) } println(os.Stdout, goroutineNo, funcName, "Finished") } func runWithCtx2(ctx2 context.Context, goroutineNo int) { funcName := "runWithContext2" println(os.Stdout, goroutineNo, funcName, "Started") for i := 0; i < 5; i++ { if done(ctx2) { println(os.Stdout, goroutineNo, funcName, "Cancelled !!") return } println(os.Stdout, goroutineNo, funcName, "Continue Running "+strconv.Itoa(i)) work(1 * time.Second) } println(os.Stdout, goroutineNo, funcName, "Finished") } func println(w io.Writer, goroutineNo int, funcName, message string) { fmt.Fprintf(w, "# %d %s %s \n", goroutineNo, funcName, message) } func done(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } } var work = time.Sleep
実行すると 以下を確認できる
goroutine #1
は実行直後にmain
関数で実行したキャンセルによりrunWithCtx1
(1層目) でキャンセルされているgoroutine #0
はrunWithCtx1
関数で設定したタイムアウトにより 3秒経過後にrunWithCtx2
(2層目) でキャンセルされている
main Started # 1 runWithCtx1 Started # 0 runWithCtx1 Started # 1 runWithCtx1 Cancelled !! // *1 # 0 runWithContext2 Started # 0 runWithContext2 Continue Running 0 # 0 runWithContext2 Continue Running 1 # 0 runWithContext2 Continue Running 2 # 0 runWithContext2 Cancelled !! // *2 # 0 runWithCtx1 Failed because of:context deadline exceeded # 0 runWithCtx1 Finished main Finished Process finished with exit code 0
まとめ
この様に 各階層で別々のContextを用いることで 細かいコールスタックの制御が可能となっている
- 1階層目は 並列稼働しているgoroutineを個別に制御できているし
(これは個別にチャネルを用意すれば Doneチャネルでも実現可能) - 2階層目は 1階層目とは別のルールを適用できている
(これはclone等の機能のない Doneチャネルでは実現が面倒)
こんな感じで良いのかな。間違っていたら教えて下さい。