おんぶろぐ ver.2

おんぶろぐがインチキだらけなので改心しました

Goの並列化パターン - Context

課題

goroutine 開始後に処理を中断したいケースがある。全てのコールスタックに doneチャネルを連携すればこの実現はできるが、以下の様な操作は実現できない

  • 一部のコールスタックを即時中断
  • 一部のコールスタックにタイムアウトを設定
  • 一部のコールスタックに限定した情報管理

解決

Go 標準の context.Context を活用し中断処理を実現する。Context が提供する機能は以下の通り

  • 各種中断処理
  • Contextスコープの提供 (key-value形式のデータ)

また、context.Context は設計を通じて以下の価値も提供している

  • スコープを分割した並列操作

    • 同じContextを引き回せば同スコープ
    • 別のContextを生成すれば別スコープ
  • Context更新による上位スタックへの影響排除 (親 Contextを元に子 Contextを生成した場合)

    • 親ContextへのCancel操作は 子Contextに影響あり (Doneとなる)
    • 子ContextへのCancel操作は 親Contextに影響なし (Doneとならない)

実践

以下は context.Contextを用いて 2階層分の関数を呼び出している

  • 0階層 main
  • 1階層 runWithCtx1
  • 2階層 runWithCtx2

1階層目の runWithCtx1main から2並列 (goroutine#0, goroutine#1) で実行されるが、goroutine#1 は実行直後に main からキャンセルされている

2階層目の runWithCtx2 は 1秒間の仕事を5回繰り返す関数だが、この例では上位の runWithCtx1 が設定したタイムアウト 3秒 により終了を待たずにキャンセルされている

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "strconv"
    "sync"
    "time"
)

func main() {
    fmt.Println("main Started")
    const goroutineNum = 2
    var wg sync.WaitGroup
    wg.Add(goroutineNum)
    for i := 0; i < goroutineNum; i++ {
        ctx, cancel := context.WithCancel(context.Background())
        go func(cxt context.Context, i int) {
            defer wg.Done()
            runWithCtx1(ctx, i)
        }(ctx, i)
        // goroutine #1 の場合のみ直後に cancel指定
        if i == 1 {
            cancel()
        }
    }
    wg.Wait()
    fmt.Println("main Finished")
}

func runWithCtx1(ctx1 context.Context, goroutineNo int) {
    funcName := "runWithCtx1"
    println(os.Stdout, goroutineNo, funcName, "Started")
    work(100 * time.Millisecond)
    if done(ctx1) {
        println(os.Stdout, goroutineNo, funcName, "Cancelled !!")
        return
    }
    // タイムアウトを3秒後に設定
    ctx2, _ := context.WithTimeout(ctx1, 3*time.Second)
    runWithCtx2(ctx2, goroutineNo)

    if err := ctx2.Err(); err != nil {
        println(os.Stdout, goroutineNo, funcName, "Failed because of:"+err.Error())
    }
    println(os.Stdout, goroutineNo, funcName, "Finished")
}

func runWithCtx2(ctx2 context.Context, goroutineNo int) {
    funcName := "runWithContext2"
    println(os.Stdout, goroutineNo, funcName, "Started")
    for i := 0; i < 5; i++ {
        if done(ctx2) {
            println(os.Stdout, goroutineNo, funcName, "Cancelled !!")
            return
        }
        println(os.Stdout, goroutineNo, funcName, "Continue Running "+strconv.Itoa(i))
        work(1 * time.Second)
    }
    println(os.Stdout, goroutineNo, funcName, "Finished")
}

func println(w io.Writer, goroutineNo int, funcName, message string) {
    fmt.Fprintf(w, "# %d %s %s \n", goroutineNo, funcName, message)
}

func done(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}

var work = time.Sleep

実行すると 以下を確認できる

  1. goroutine #1 は実行直後に main 関数で実行したキャンセルにより runWithCtx1 (1層目) でキャンセルされている
  2. goroutine #0runWithCtx1 関数で設定したタイムアウトにより 3秒経過後に runWithCtx2 (2層目) でキャンセルされている
main Started
# 1 runWithCtx1 Started 
# 0 runWithCtx1 Started 
# 1 runWithCtx1 Cancelled !! // *1
# 0 runWithContext2 Started 
# 0 runWithContext2 Continue Running 0 
# 0 runWithContext2 Continue Running 1 
# 0 runWithContext2 Continue Running 2 
# 0 runWithContext2 Cancelled !!  // *2
# 0 runWithCtx1 Failed because of:context deadline exceeded 
# 0 runWithCtx1 Finished 
main Finished

Process finished with exit code 0

まとめ

この様に 各階層で別々のContextを用いることで 細かいコールスタックの制御が可能となっている

  • 1階層目は 並列稼働しているgoroutineを個別に制御できているし
    (これは個別にチャネルを用意すれば Doneチャネルでも実現可能)
  • 2階層目は 1階層目とは別のルールを適用できている
    (これはclone等の機能のない Doneチャネルでは実現が面倒)

こんな感じで良いのかな。間違っていたら教えて下さい。