『入門 Goプログラミング』UNIT 7 並行プログラミング 読書メモ | Go日記
『入門 Goプログラミング』UNIT 7 並行プログラミング 読書メモ
・ゴルーチンの起動。
・通信のためにチャンネルを使う。
・チャンネルのパイプラインに対する理解。
この入門書の特徴として、最初に素朴な(言い方は悪いが「鈍臭い」)例や、間違った考え方から出発して、それらを洗練したり訂正したりしていく課程で学習し理解を深めるような構成になっている。
今回の並行プログラミングの学習については書籍の流れに概ね従う。しかし、書籍の中で使われている「gopher 工場」のたとえまでこのnoteに持ち込む必要もないのでそういう部分は省いた。
前置き
並行タスク
数多くの並行したタスクが自分の仕事をしながら、
なにか共通の目的に向けて通信を行い、
情報を交換する。
Go では、(並行に)独立して実行される個々のタスクを「ゴルーチン goroutine 」と呼びます。
ゴルーチンの起動
必要なのは関数呼び出しの前の go というキーワードを置くだけ。
main 関数がリターンするとき、そのプログラムのすべてのゴルーチンは即座に停止する。→ このことから、素朴に実装された最初のサンプルプログラムは、main 関数側で十分に待ってあげることによってゴルーチンの実行を見届けている。
※ この記事に載せるコードは書籍内サンプルそのままではない。
main.go
package main
import (
"fmt"
"time"
)
func main() {
// ()をつけた呼び出しに go キーワードをつける
go sleepyG()
// 十分に待つ
time.Sleep(4 * time.Second)
fmt.Println("Done")
}
func sleepyG() {
time.Sleep(2 * time.Second)
fmt.Println("... snore ...")
}
$ go build
$ ./getProgrammingGoUnit7Goroutine
... snore ...
Done
複数のゴルーチン
go キーワードを使うたびに新しいゴルーチンが1つ起動される。すべてのゴルーチンは同時に実行されるだろうと期待するかもしれないが必ずしもそうとは限らない。どのような順序で実行されるかは環境に依存する。だからいつも「各ゴルーチンの処理はどんな順序で実行されるかわからない」と想定するのがベスト。
package main
import (
"fmt"
"time"
)
func main() {
for i := 0; i < 5; i++ {
go sleepyG(i)
}
// 十分に待つ
time.Sleep(4 * time.Second)
fmt.Println("Done")
}
func sleepyG(i int) {
fmt.Println("goroutine", i)
time.Sleep(2 * time.Second)
fmt.Println("... snore ...", i)
}
実行例
goroutine 4
goroutine 0
goroutine 1
goroutine 2
goroutine 3
... snore ... 3
... snore ... 1
... snore ... 2
... snore ... 4
... snore ... 0
Done
これまでのコードの「十分に待つ」と戦略は簡単に破綻する。例えばゴルーチンの先で他のシステムからのレスポンスを待つ必要がある場合など。
すべてのゴルーチンが終了したときにそれを知る術が必要。それがチャネル channel 。
チャネル
チャネルはゴルーチン to ゴルーチンで値を送るためのもの。
チャネルの作成には組み込みの make を使う。
c := make(chan int) // 整数の送受信ができるチャネル
チャネルで値を送受信するには左向き矢印演算子 <- を使う。
値を送信するには、矢印の左側にチャネル式を書きます。矢印の右にある値が、そのチャネルに流れて行くように書くわけです。
c <- 99
チャネルから値を受信するとき、矢印はチャネルから出てきます(チャネルの左側に矢印を置きます)。
r := <-c // チャネル c から受信し、受信した値を r に代入
次のサンプルコードは、
・1つのチャネルを作成し
・5つのゴルーチンに渡し
・個々のゴルーチンがそれぞれ1個送信する、計5個のメッセージ受信を待つ。
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
go sleepyG(i, c)
}
for i := 0; i < 5; i++ {
gID := <-c
fmt.Println("receive", gID)
}
}
func sleepyG(i int, c chan int) {
time.Sleep(2 * time.Second)
fmt.Println("... snore ...", i)
c <- i
}
実行例
... snore ... 3
... snore ... 4
... snore ... 1
... snore ... 0
receive 3
receive 4
receive 1
receive 0
... snore ... 2
receive 2
select-case
生成されるすべてのチャネルが同質の役割を担うものだけで構成されるとは限らないので、役割が異なった複数のチャネルを扱うために select 構文が用意されている。
例は、
・ランダムにスリープする goroutine からの応答を扱うチャネル c
・一定時間が経過した後に値を受信できるチャネルを返す time.After 関数が生成するチャネル
の2つを扱うサンプルコードである。
package main
import (
"fmt"
"time"
"math/rand"
)
func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
go sleepyG(i, c)
}
timeout := time.After(3 * time.Second)
for i := 0; i < 5; i++ {
select {
case gID := <-c:
fmt.Println("receive", gID)
case <-timeout:
fmt.Println("Timeout!")
return
}
}
}
func sleepyG(i int, c chan int) {
time.Sleep(time.Duration(rand.Intn(4000)) * time.Millisecond)
fmt.Println("... snore ...", i)
c <- i
}
実行例
... snore ... 2
receive 2
... snore ... 3
receive 3
... snore ... 4
receive 4
Timeout!
タイムアウトに間に合わなかったゴルーチン(実行例では gID が 0 と 1 のゴルーチン)のタスクは、main からの retuen によって破棄される。
nil チャネル
チャネルは make によって明示的に作成する必要があるが、まだ make されていないチャネルはゼロ値として nil を持っている。
nil のチャネルを使おうとしてもパニックにはならない(代わりにその演算(送信または受信)は永遠にブロックされる)。ただし、nil チャネルを close しようとするとパニックになる。
nil チャネルが役に立つシチュエーションの説明がコラムにあったが、サンプルコードはなく、いまひとつピンとこなかったので解説記事を検索してみた。
v, ok := <-ch パターンを使って ok なうちは v を利用する。
!ok になったらそれは ch のタスクが終了したってことだから ch を nil にセットしてやる。nil チャネルを使うだけだったらパニックにはならない。ch が nil かどうかを select の外側でチェックしてやることで終了した ch の処理をスキップする。
コードを写経してみてやっと意味がわかった。
package main
import (
"fmt"
"time"
"math/rand"
)
func main() {
a := asChan(1, 3, 5, 7)
b := asChan(2, 4, 6, 8)
c := merge(a, b)
for v := range c {
fmt.Println(v)
}
}
func asChan(vs ...int) <-chan int {
c := make(chan int)
go func() {
for _, v := range vs {
c <- v
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
close(c)
}()
return c
}
func merge(a, b <-chan int) <-chan int {
c := make(chan int)
go func() {
defer close(c)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok {
fmt.Println("a is done")
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok {
fmt.Println("b is done")
b = nil
continue
}
c <- v
}
}
}()
return c
}
ブロック/デッドロック
あるゴルーチンがチャネルで送信または受信を待っているとき、そのゴルーチンは「ブロックされている」と表現します。
ゴルーチンは自分に対するブロックが解除されるのをただひたすら静かに待っている。
1個または複数のゴルーチンが永遠に発生するはずのないものによってブロックされた状態が「デッドロック」。デッドロックを予防することは理論上では困難だが、プログラマが単純なガイドラインに沿ってプログラミングすれば防ぐことが可能(そんなに難しい話ではない)。
ゴルーチンのパイプライン
ゴルーチンの完了を空文字で知らせる素朴な例。
package main
import (
"fmt"
)
func main() {
cBang := make(chan string)
cFilter := make(chan string)
go bang(cBang)
go filterWorker(cBang, cFilter)
printer(cFilter)
}
func bang(downstream chan string) {
for _, v := range []string{"Task1", "Task2", "E", "Task3"} {
downstream <- v
}
downstream <- ""
}
func filterWorker(upstream, downstream chan string) {
for {
item := <-upstream
if item == "" {
downstream <- ""
return
}
if item != "E" {
downstream <- item
}
}
}
func printer(upstream chan string) {
for {
v := <-upstream
if v == "" {
return
}
fmt.Println(v)
}
}
実行例
Task1
Task2
Task3
// E が除外されている
この素朴な例には問題がある。
空文字を終了の意味で使ったが、処理したい文字列群の中に空文字が含まれていたとしたらコードはそれを「終了」の意味で捉えてしまう!
→ もうこれ以上チャネルに値を送らないことを close(c) で示すことができる。
チャネルがクローズされたかのチェックは v, ok := <-c パターンを使う。
close を使用した書き方に変更したコードがこちら。
package main
import (
"fmt"
)
func main() {
cBang := make(chan string)
cFilter := make(chan string)
go bang(cBang)
go filterWorker(cBang, cFilter)
printer(cFilter)
}
func bang(downstream chan string) {
for _, v := range []string{"Task1", "Task2", "E", "Task3"} {
downstream <- v
}
close(downstream)
}
func filterWorker(upstream, downstream chan string) {
for {
item, ok := <-upstream
if !ok {
close(downstream)
return
}
if item != "E" {
downstream <- item
}
}
}
func printer(upstream chan string) {
for {
v, ok := <-upstream
if !ok {
return
}
fmt.Println(v)
}
}
「close されるまでチャネルから読む」というパターンは一般的なので range でショートカットできる。
package main
import (
"fmt"
)
func main() {
cBang := make(chan string)
cFilter := make(chan string)
go bang(cBang)
go filterWorker(cBang, cFilter)
printer(cFilter)
}
func bang(downstream chan string) {
for _, v := range []string{"Task1", "Task2", "E", "Task3"} {
downstream <- v
}
close(downstream)
}
func filterWorker(upstream, downstream chan string) {
for item := range upstream {
if item != "E" {
downstream <- item
}
}
close(downstream)
}
func printer(upstream chan string) {
for v := range upstream {
fmt.Println(v)
}
}
競合状態
複数のゴルーチンで1つの map を更新していくような設計にすると競合状態が発生する。この競合を回避するためにミューテックスが登場する。
ミューテックス
相互排他 mutual exclusion の略で mutex 。
ゴルーチン同士が同時に何かを行うのを防ぐために使えるのがミューテックス。ミューテックスが持つ相互排他性は、守ろうとしている対象について必ずそれ(ミューテックス)が使われるという事実に支えられている。
ミューテックスには Lock と Unlock という2つのメソッドがある。
ミューテックスを正しく使うには、必ず次のことを徹底しなければならない。すなわち、
・共有値にアクセスするコードは、まずそのミューテックスをロックする
・それから必要な処理を行い
・その後でミューテックスをアンロックすること
このパターンに従わないコードが紛れ込むと競合状態が発生する危険が高まるので、ミューテックスでなにを保護するのかはパッケージ内に隠されることが多い。
ミューテックスは sync パッケージの中にある。
初期化は不要。ゼロ値はアンロックされたミューテックス。
package main
import (
"sync"
)
var mu sync.Mutex
func main() {
mu.Lock()
defer mu.Unlock()
// この関数からリターンするまでロックされる
}
ガイドライン
・ミューテックスをロックしている間のコードは単純にしておく
・共有状態ひとつにつき、ミューテックスを1個だけにする
ーーー と、『入門 Goプログラミング』では mutex の解説は程々に、という感じ。
少し物足りなさを感じたので、もう一冊の参考書籍:『Go言語による並行処理』 単行本(ソフトカバー) – 2018/10/26 Katherine Cox-Buday (著), 山口 能迪 (翻訳) 3.2.2 MutexとRWMutex を読んでみた。
MutexとRWMutex
整数型の共有変数を並行にインクリメント/デクリメントする例。
package main
import (
"fmt"
"sync"
)
func main() {
var cnt int
var lock sync.Mutex
inc := func() {
lock.Lock()
defer lock.Unlock()
cnt++
fmt.Printf("Incrementing %d\n", cnt)
}
dec := func() {
lock.Lock()
defer lock.Unlock()
cnt--
fmt.Printf("Decrementing %d\n", cnt)
}
var wg sync.WaitGroup
// increment
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inc()
}()
}
// decriment
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
dec()
}()
}
wg.Wait()
fmt.Println("Complete")
}
実行例
Decrementing -1
Incrementing 0
Incrementing 1
Incrementing 2
Incrementing 3
Incrementing 4
Decrementing 3
Decrementing 2
Decrementing 1
Decrementing 0
Complete
defer の中に Unlock メソッドを入れ込むことによって、たとえ panic になっても確実に呼び出される。呼び出しに失敗してしまうとデッドロックが発生する。
クリティカルセクション ーーー プログラムが共有リソースに対する排他的アクセスを必要とする場所のこと。上のサンプルコードで言えば cnt 共有変数 ーーー への出入りはコストが高く付く。クリティカルセクションで消費される時間は最小限にしたい。そのためのひとつの戦略は、クリティカルセクションの「断面積」を減らすこと。複数の並行処理で共有するメモリに対する Read とWrite が必ずしも両方必要でないとき、sync.RWMutex の使用が検討できる。
sync.RWMutex は概念的には sync.Mutex と同じだが、よりやることが細かく、メモリに対する Read のロックを要求した場合、ロックが Write で保持されてなければアクセスを得ることができる。
書籍では別の例と出しているが、ここでは先程のインクリメント/デクリメントのコードに sync.RWMutex を組み込んでみた。(動かしてみて、例として適切でないと実感。しかし、学習の一環なのでそのまま載せる。)
package main
import (
"fmt"
"sync"
"time"
"math/rand"
)
func main() {
var cnt int
var lock sync.RWMutex
inc := func() {
lock.Lock()
defer lock.Unlock()
time.Sleep(time.Duration(rand.Intn(3000)) * time.Millisecond)
cnt++
fmt.Println("Incrementing")
}
dec := func() {
lock.Lock()
defer lock.Unlock()
time.Sleep(time.Duration(rand.Intn(3000)) * time.Millisecond)
cnt--
fmt.Println("Decrementing")
}
repo := func() {
lock.RLocker()
// defer lock.Unlock() /* DO NOT DO THIS */
time.Sleep(time.Duration(rand.Intn(7000)) * time.Millisecond)
fmt.Printf("cnt %d\n", cnt)
}
var wg sync.WaitGroup
// increment
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inc()
}()
}
// decriment
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
dec()
}()
}
// report
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
repo()
}()
}
wg.Wait()
fmt.Println("Complete")
}
実行してみると、
Incrementing
cnt 1
Incrementing
cnt 2
cnt 2
cnt 2
cnt 2
Incrementing
Incrementing
Incrementing
Decrementing
Decrementing
Decrementing
Decrementing
Decrementing
Complete
このような出力になる。
cnt 変数へ変更を加える inc() と dec() は Lock() と Unlock() の組み合わせのままだが、新しく加えた repo() は cnt の値を読むだけなので RLocker() メソッドを使用し、Unlock() を避けている。
感想
並行処理、奥が深いなぁと思う。『入門Goプログラミング』の説明は浅いのだが「手を動かしてみよう」重視でエントリーにはとても良い。この note でも最後は『Go言語による並行処理』からの解説+例で補ったが、『Go言語による並行処理』でいうと実践編の最初の内容であり、まだまだ先がある。これからも引き続き並行処理についてまずは基本を読み込んでいきたい。
SN