goroutine と channel を復習する

いろいろ考えて golang を第 2 言語として全振りすることに決めたのでちゃんとやっていこうと思います。

わたしは web application をつくるのがナリワイなので非同期処理を並列に行うことが多い。したがって goroutine を使う場合も終了タイミングが読めない非同期処理を扱う場合が多いので、それを想定して goroutine の使い方を復習します。

参考

以下の記事を参考にしながらコードを書きつつ挙動などを把握しようと努めました。

goroutine に単純に非同期処理を逃した例

終了待ち忘れ

逃しているが goroutine 内の処理を完了を待たないので finish のみが出力されてしまう。

package main
import (
    "github.com/mmmpa/go_playground/easy"
    "log"
)
func main() {
    go async()
    go async()
    go async()
    log.Print("finish\n")
}
func async() {
    s := easy.RandomSecsSleep(3)
    log.Printf("sleep: %v\n", s)
}

sleep という log は出ない

2017/10/18 05:28:12 finish

channel で各 goroutine の終了までブロックする

channel は送信側でも受信側でもブロックが生じる挙動を持ちます (buffer が満杯になった場合。デフォルト buffer は 0 なので 1 つ受信するとそれが送信されるまでブロックされる)。今回は受信するまでプロセスをブロックする挙動を利用して goroutine の終了を待つことにします。

ch を作成し ch を goroutine の func に引数として渡しておくことで goroutin から ch への送信 (ch <-)が行なえます。メインプロセスでは ch からの受信 (<-ch)をまつことによりプロセスをブロックすることができます。

<-ch + <-ch + <-ch として 3 度の送信を待っているので 3 度の受信が起こるまで (すなわちすべての goroutine が終了に至るまで) メインプロセスはブロックされます。

package main
import (
    "github.com/mmmpa/go_playground/easy"
    "log"
    "time"
)
func main() {
    ch := make(chan time.Duration)
    go async(ch)
    go async(ch)
    go async(ch)
    total := &lt;-ch + &lt;-ch + &lt;-ch
    log.Printf("finish: total: %v\n", total)
}
func async(ch chan time.Duration) {
    // これは 1 - 3 秒ランダムにスリープする自前関数です
    s := easy.RandomSecsSleep(3)
    log.Printf("sleep: %v\n", s)
    ch &lt;- s
}

sleep という log が出る

2017/10/18 05:15:30 sleep: 1s
2017/10/18 05:15:30 sleep: 1s
2017/10/18 05:15:31 sleep: 2s
2017/10/18 05:15:31 finish: total: 4s

余談

js の async/await 時に Promise を先に走らせて await あとに置くことで並列実行する手法が似ていると思いました。

大量の処理を goroutine 量を決めて処理する

限界いっぱいまで goroutine を発行すると特に web application では他の request 分の能力も食いつぶしてしまってよくないのではないでしょうか?ということで goroutine 数を絞って使いまわす方法です。

今回のケースでは大量の URL リストがありそれらをゆるやかにダウンロードするという想定をします。一般的に http request にはタイムアウトを設定します。その上で一度に数百のリクエストを発行してしまうと自分のネットワークキャパシティを食いつぶし、リスト後半がことごとくタイムアウト扱いとなるということもありえます。そこで一度の接続本数をある程度絞りましょう、という想定です。

従来型の並列プログラミングでは fetch のみが存在し URL 取得と結果収集は mutex によるデータ保護下で行われていたと思います。それを channel による通信で行うのが golang way だと理解しています。

package main
import (
    "github.com/mmmpa/my_playground/easy"
    "log"
    "time"
    "sync"
)
func main() {
    // url を生成する自前関数です
    urls := easy.GenUrls(5)
    start := time.Now().UnixNano()
    maxWorkers := 2
    worker_in := provide(urls)
    worker_out := fetch(maxWorkers, worker_in)
    task_result := receive(worker_out)
    log.Printf(
        "finish: worker_total: %v, total: %v \n",
        &lt;-task_result,
        time.Duration(time.Now().UnixNano()-start),
    )
}
func receive(out chan time.Duration) chan time.Duration {
    re := make(chan time.Duration)
    go func() {
        defer close(re)
        total := time.Duration(0)
        defer func() { re &lt;- total }()
        for s := range out {
            total += s
        }
    }()
    return re
}
func provide(urls []string) chan string {
    in := make(chan string)
    go func() {
        defer close(in)
        for _, url := range urls {
            log.Printf("send: %v\n", url)
            in &lt;- url
        }
    }()
    return in
}
func fetch(maxWorkers int, in chan string) chan time.Duration {
    out := make(chan time.Duration)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for i := 0; i &lt; maxWorkers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for url := range in {
                    log.Printf("start fetching: %v\n", url)
                    s := easy.RandomSecsSleep(3)
                    log.Printf("fetched: %v %v\n", url, s)
                    out &lt;- s
                }
                log.Print("in closed\n")
            }()
        }
        wg.Wait()
    }()
    return out
}
2017/10/21 06:05:12 send: url_1
2017/10/21 06:05:12 send: url_2
2017/10/21 06:05:12 send: url_3
2017/10/21 06:05:12 start fetching: url_2
2017/10/21 06:05:12 start fetching: url_1
2017/10/21 06:05:13 fetched: url_2 1s
2017/10/21 06:05:13 start fetching: url_3
2017/10/21 06:05:13 send: url_4
2017/10/21 06:05:14 fetched: url_1 2s
2017/10/21 06:05:14 start fetching: url_4
2017/10/21 06:05:14 send: url_5
2017/10/21 06:05:16 fetched: url_3 3s
2017/10/21 06:05:16 start fetching: url_5
2017/10/21 06:05:17 fetched: url_4 3s
2017/10/21 06:05:17 in closed
2017/10/21 06:05:17 fetched: url_5 1s
2017/10/21 06:05:17 in closed
2017/10/21 06:05:17 finish: worker_total: 10s, total: 5.000707599s

channel は関数内で生成する (結果的に goroutine も)

func provide(urls []string, in chan string) {
    defer close(in)
    for _, url := range urls {
        log.Printf("send: %v\n", url)
        in &lt;- url
    }
}

書き始めはこのような関数を用意し、メインプロセスで go provide(urls, in) としていました。channel を外部から注入していくスタイルですね。in を閉じられるとわかるのはこの関数のみなので defer close(in) がありますが、channel を供給するメインプロセス側では閉じる処理がどこにあるかが判然としません。

func provide(urls []string) chan string {
    in := make(chan string)
    go func() {
        defer close(in)
        for _, url := range urls {
            log.Printf("send: %v\n", url)
            in &lt;- url
        }
    }()
    return in
}

書き換えたあとの channel を返すスタイルの関数は本処理を 1 段ネストさせてしまう他、関数の呼び出し順序が縛られてしまうというデメリットがあります。しかし channel の発行場所で close を行えるというメリットがあります。close が外や他の関数にある場合、close closed channel などの事故を起こしてしまう可能性が高まりますので、安全性を考えると channel を返していくスタイルが良いと思いました。

しかし channel 注入型もメインプロセス上で channel と goroutine の関係性の見通しが良い感じもしますし、悩ましいところですね。

worker 用の channel を閉じる

func fetch(maxWorkers int, in chan string) chan time.Duration {
    out := make(chan time.Duration)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for i := 0; i &lt; maxWorkers; i++ {
            wg.Add(1)
            // worker 本体
            go func() {
                defer wg.Done()
                for url := range in {
                    log.Printf("start fetching: %v\n", url)
                    s := easy.RandomSecsSleep(3)
                    log.Printf("fetched: %v %v\n", url, s)
                    out &lt;- s
                }
                log.Print("in closed\n")
            }()
        }
        wg.Wait()
    }()
    return out
}

worker が結果を送信する channel は複数の goroutine からの送信を受けつけるため、すべての goroutine が終わったのを確認してから閉じなければなりません。すべての worker が終わったことは sync.WaitGroup を使うことによって簡単に検知できますが、もう 1 段ネストが深くなってしまうのが難と言えば難です。また wg.Wait() 用の関数もまた goroutine にしなければなりません。

receive の終了

func receive(out chan time.Duration) chan time.Duration {
    re := make(chan time.Duration)
    go func() {
        defer close(re)
        total := time.Duration(0)
        defer func() { re &lt;- total }()
        for s := range out {
            total += s
        }
    }()
    return re
}

こうして供給、worker の channel を適切に閉じていくことにより、結果を収集する receiver も channel の close を見るだけで処理を終了することができるようになります。書き始めた当初はそのキビがわからず、len(urls) 回 for を回した後に終了するという処理になっていました。

今回の学習はここまで

Contextselect を用いた中断などもコードを書いて確認したのですが、疲れたので今回はここまで。