今回は Go 言語で「並列 (parallel) プログラミング」に挑戦してみましょう。なお、M.Hiroi はマルチコア CPU で動作するプログラムを作るのはこれが初めてです。初心者が作るプログラムなので、何か問題点や間違いがあるかもしれません。お気づきの点がありましたら、ご指摘いただけると助かります。
それでは実際に並列処理の効果を試してみましょう。「並行プログラミング」で作成したフィボナッチ関数を求めるプログラムで、CPU のコア数を 4 に増やします。
リスト : タイムアウト処理 (sample1401.go) package main import ( "fmt" "time" "runtime" ) // フィボナッチ関数 func fibo(n int) int { if n < 2 { return 1 } else { return fibo(n - 2) + fibo(n - 1) } } func main(){ runtime.GOMAXPROCS(4) // コア数の指定 ch := make(chan int, 5) for _, n := range []int{41, 41, 39, 35, 36} { go func(x int){ ch <- fibo(x) }(n) } for i := 5; i > 0; { select { case n := <- ch: fmt.Println(n) i-- case <- time.After(time.Second): fmt.Println("Timeout") i = 0 } } }
$ go run sample1401.go 14930352 24157817 102334155 267914296 267914296
コア数を 4 に増やすことで、すべての値をタイムアウトせずに求めることができました。
次は、フィボナッチ関数を使ってコア数を増やすと効率がどのくらい上がるか試してみましょう。
リスト : 並列処理の効率 (sample1402.go) package main import ( "fmt" "time" "runtime" ) // フィボナッチ関数 func fibo(n int) int { if n < 2 { return n } else { return fibo(n - 1) + fibo(n - 2) } } func main() { ch := make(chan int, 6) for n := 1; n <= 6; n++ { fmt.Println("-----", n, "-----") runtime.GOMAXPROCS(n) s := time.Now() for i := 0; i < 4; i++ { go func(){ ch <- fibo(40) }() } for i := 4; i > 0; i-- { fmt.Print(<- ch, " ") } e := time.Now().Sub(s) fmt.Println(e) } }
fibo(40) を 4 回計算しますが、1 回の計算を goroutine に割り当て、4 回の計算を並列に処理します。for ループの変数 n は使用するコア数を表します。n = 1 とすると逐次実行と変わりませんが、n の値を増やすと並列処理される goroutine の数も増えるので、4 個の goroutine がすべて終了するまでの所要時間 (Turn Around Time) は短縮されるはずです。
一般に、ある計算を p 台の演算装置で並列処理した場合の所要時間を T(p) とすると、並列処理の効果は次の式で表すことができます。
S(p) = T(1) / T(p) E(p) = S(p) / p
S(p) を高速化率 (Speed up ratio) とか加速率と呼び、E(p) を並列化効率と呼びます。理想的には S(p) = p, E(p) = 1 となり、これを ideal speed up といいます。実際には S(p) < p, E(p) < 1 になり、p を増やしていくと E(p) の値が下がっていく場合が多いようです。
それでは実行結果を示します。
表 : 実行結果 (時間 : 秒) 実行環境 : Ubuntu 22.04 (WSL2), Go 1.23.2, Intel Core i5-6200U 2.30GHz p : 時間 : S(p) : E(p) ---+------+------+------ 1 : 2.71 : 1.00 : 1.00 2 : 1.62 : 1.67 : 0.84 3 : 1.30 : 1.96 : 0.65 4 : 1.18 : 2.30 : 0.58 5 : 1.18 : 2.30 : 0.46 6 : 1.18 : 2.30 : 0.38
p を増やすと E(p) の値は下がっていきます。S(p) の値は p = 4 の 2.3 が最大で、それ以降は p の値を増やしても効果はほとんどありません。さすがに、実行速度は 1 / 4 にならなかったですね。それでも約 2 倍ちょっとの高速化ができるのですから、並列処理の効果はとても大きいことがわかります。
物理コア数が 4、ハイパースレッド *2 で、論理コア数は 8 になります。fibo(40) を 8 回計算した結果です。
表 : 実行結果 (時間 : 秒) 実行環境 : Windows 7, Go 1.2, Core i7-2670QM 2.20GHz p : 時間 : S(p) : E(p) ---+------+------+------ 1 : 8.04 : 1.00 : 1.00 2 : 4.18 : 1.92 : 0.96 3 : 2.88 : 2.79 : 0.93 4 : 2.17 : 3.71 : 0.93 5 : 2.02 : 3.78 : 0.76 6 : 1.87 : 4.30 : 0.72 7 : 1.75 : 4.59 : 0.65 8 : 1.62 : 4.96 : 0.62 9 : 1.64 : 4.90 : 0.54 10 : 1.64 : 4.90 : 0.49
E(p) は物理コア数の 4 までは 1 に近い値になりますが、それを超えると E(p) の値は下がっていきます。S(p) の値は p = 8 の 4.96 が最大で、それ以降は p の値を増やしても効果はほとんどありません。さすがに、実行速度は 1 / 8 にならなかったのですが、それでも約 5 倍の高速化ができるのですから、並列処理の効果はとても大きいことがわかります。
次は数値積分で円周率 \(\pi\) を求めてみましょう。区間 [a, b] の定積分 \(\int_a^b f(x)\,dx\) を数値的に求めるには、区間を細分して小区間の面積を求めて足し上げます。小区間の面積を求める一番簡単な方法は長方形で近似することです。この場合、3 つの方法が考えられます。
1 は左端の値 f(a) を、2 は右端の値 f(b) を、3 は中間点の値 f((a + b) / 2) を使って長方形の面積を計算します。この中で 3 番目の方法が一番精度が高く、これを「中点則」といいます。このほかに、台形で近似する「台形則」や、2 次近似で精度を上げる「シンプソン則」という方法があります。
それでは実際に、1 の方法と中点則で \(\pi\) の値を求めてみましょう。\(\pi\) は次の式で求めることができます。
プログラムは次のようになります。
リスト : 数値積分で円周率を求める (sample1403.go) package main import ( "fmt" "math" ) // 左端 func leftPoint(n int) float64 { w := 1.0 / float64(n) s := 0.0 for i := 0; i < n; i++ { x := float64(i) * w s += 4.0 / (1.0 + x * x) } return s * w } // 中点則 func midPoint(n int) float64 { w := 1.0 / float64(n) s := 0.0 for i := 1; i <= n; i++ { x := (float64(i) - 0.5) * w s += 4.0 / (1.0 + x * x) } return s * w } func main() { n := 10 for i := 1; i <= 9; i++ { fmt.Println("-----", n, "-----") pi := midPoint(n) fmt.Println(pi, math.Pi - pi) pi = leftPoint(n) fmt.Println(pi, math.Pi - pi) n *= 10 } }
関数 leftPoint は 1 の方法で、関数 midPoint が中点則でπの値を求めます。引数 n が分割数です。最初に小区間の幅を求めて変数 w にセットします。面積は変数 s にセットします。次の for ループで区間 [0, 1] を n 個に分割して面積を求めます。
最初に x 座標を計算します。leftPoint は float64(i) * w でいいのですが、midPoint は中間点を求めるため、変数 i を 1 から始めて、x 座標を次の式で求めます。
x := (float64(i) - 0.5) * w
たとえば、変数 i が 1 の場合は 0.5 になるので、x は区間 [0 * w, 1 * w] の中間点になります。あとは、4 / (1 + x * x) を計算して s に加算します。最後に s に w を掛け算して全体の面積を求めます。
実行結果を示します。
$ go run sample1403.go ----- 10 ----- 3.1424259850010987 -0.0008333314113055934 3.2399259889071588 -0.09833333531736566 ----- 100 ----- 3.1416009869231254 -8.333333332277704e-06 3.151575986923129 -0.009983333333335676 ----- 1000 ----- 3.1415927369231227 -8.333332957022321e-08 3.1425924869231245 -0.0009998333333314235 ----- 10000 ----- 3.141592654423134 -8.3334095180021e-10 3.14169265192314 -9.999833334672914e-05 ----- 100000 ----- 3.1415926535981615 -8.36841707041458e-12 3.1416026535731527 -9.99998335959873e-06 ----- 1000000 ----- 3.1415926535897643 2.886579864025407e-14 3.141593653589793 -9.999999996956888e-07 ----- 10000000 ----- 3.141592653589731 6.217248937900877e-14 3.141592753589987 -1.0000019390332682e-07 ----- 100000000 ----- 3.1415926535904264 -6.332712132461893e-13 3.141592663590225 -1.0000432038026474e-08 ----- 1000000000 ----- 3.1415926535899708 -1.7763568394002505e-13 3.1415926545896586 -9.998655237097864e-10
中点則の場合、分割数を 10 倍すると誤差はほぼ 1/100 になります。それに対し、1 の方法は分割数を 10 倍しても誤差は 1 / 10 にしかなりません。このように、1 の方法は分割数を増やさないと精度の高い値を求めることができません。
ただし、浮動小数点数 (float64) の計算には誤差があるので、精度には限界があります。中点則の場合、分割数を 1000000 より増やしても精度は高くなりません。1 の方法は分割数を増やすと誤差は少なくなりますが、実行時間がかかるようになります。そこで、並列処理を使って実行時間を短縮してみましょう。
並列化の考え方は簡単です。たとえば、4 つの goroutine で並列化するのであれば、区間を [0, 0.25], [0.25, 0.5], [0.5, 0.75], [0.75, 1] のように四等分して、それぞれの区間を 1 つの goroutine で並列に計算します。あとは、その値の足し算すればいいわけです。
プログラムは次のようになります。
リスト : 数値積分で円周率を求める (並列化, sample1404.go) package main import ( "fmt" "time" // "runtime" ) const ( N = 1000000000 W = 1.0 / float64(N) ) func leftPoint(n, m int) float64 { s := 0.0 for i := n; i < m; i++ { x := float64(i) * W s += 4.0 / (1.0 + x * x) } return s * W } func main() { // runtime.GOMAXPROCS(runtime.NumCPU()) デフォルトで論理コア数 4 に設定される for i := 1; i <= 8; i *= 2 { fmt.Println("-----", i, "-----") ch := make(chan float64, i) s := time.Now() k := N / i for j := 0; j < i; j++ { go func(n, m int) { ch <- leftPoint(n, m) }(j * k, (j + 1) * k) } sum := 0.0 for j := i; j > 0; j-- { sum += <- ch } e := time.Now().Sub(s) fmt.Println(sum) fmt.Println(e) } }
定数 N が分割数で、W が小区間の幅を表します。関数 leftPoint の引数 n, m は区間を表します。main では、並列で実行する goroutine を 1, 2, 4, 8 と増やして実行時間を計測します。for ループの変数 i が起動する goroutine の個数になります。あとは、go で匿名関数を実行して、その中で leftPoint を呼び出します。最後に、チャネルから結果を取り出して変数 sum に加算します。
----- 1 ----- 3.1415926545896586 1.517611247s ----- 2 ----- 3.1415926545897657 878.123269ms ----- 4 ----- 3.14159265458993 779.652373ms ----- 8 ----- 3.141592654589915 776.100571ms表 : 実行結果 (時間 : 秒) 実行環境 : Ubuntu 22.04 (WSL2), Go 1.23.2, Intel Core i5-6200U 2.30GHz N : 時間 : 効率 ---+--------+------ 1 : 1.518 : 1.00 2 : 0.878 : 1.72 4 : 0.780 : 1.94 8 : 0.776 : 1.95
それでは実行結果を示します。4 分割で約 1.9 倍の高速化に成功しました。
物理コア数が 4、ハイパースレッド *2 で、論理コア数は 8 になります。
表 : 実行結果 (時間 : 秒) 実行環境 : Windows 7, Go 1.2, Core i7-2670QM 2.20GHz N : 時間 : 効率 ---+--------+------ 1 : 14.294 : 1.00 2 : 7.340 : 1.95 4 : 3.766 : 3.80 8 : 2.005 : 7.13
8 分割で約 7 倍の高速化に成功しました。数値積分は並列処理との相性がいいようで、その効果はとても高いですね。この結果には M.Hiroi も驚きました。
乱数を使って数学や物理などの問題を解くアルゴリズムを「モンテカルロ法 (Monte Carlo methods)」といいます。簡単な例題として、円周率πをモンテカルロ法で求めてみましょう。
正方形の領域 (0 <= x < 1, 0 <= y < 1) に乱数で点を打ちます。乱数であれば点は領域内に一様に分布するので、x2 + y2 < 1 の円内に入る確率は π / 4 になります。つまり、(円内の点の個数 / 点の総数) の値は 0.7853... になるはずです。たくさん点を打つほど値は π / 4 に近づくはずですが、コンピュータの乱数は疑似乱数なので規則性が生じてしまい、値の精度にはどうしても限界があります。
また、たくさん点を打つと実行時間がかかるようになりますが、並列に処理することで実行時間を短縮することができます。たとえば、100 万個の点を打ってπを求める処理を並列に 8 回行って平均値を計算すれば、800 万個の点を打ってπを求めたことと同じになります。
このとき、goroutine ごとに異なる乱数列を発生するジェネレータを割り当てることがポイントです。Go 言語の場合、rand パッケージに新しい乱数ジェネレータを生成する関数が用意されています。
func NewSource(s int64) Source func New(s Source) *Rand
関数 NewSource は整数値 seed から乱数の元になるデータ Source を生成します。Source から乱数ジェネレータを生成する関数が New です。New は乱数ジェネレータ Rand へのポインタを返します。
一般に、乱数の元になるデータを「シード (seed : 種)」といいます。乱数ジェネレータを生成する seed の値を変えることで、異なる乱数列を発生させることができます。あとは、Rand に定義されているメソッドを使って乱数列を生成すればいいわけです。
簡単な例を示しましょう。
リスト : 乱数ジェネレータの生成 (sample1405.go) package main import ( "fmt" "math/rand" ) func main() { r1 := rand.New(rand.NewSource(1)) r2 := rand.New(rand.NewSource(2)) r3 := rand.New(rand.NewSource(3)) for i := 0; i < 20; i++ { fmt.Print(r1.Intn(100), " ") } fmt.Println("") for i := 0; i < 20; i++ { fmt.Print(r2.Intn(100), " ") } fmt.Println("") for i := 0; i < 20; i++ { fmt.Print(r3.Intn(100), " ") } fmt.Println("") }
$ go run sample1405.go 81 87 47 59 81 18 25 40 56 0 94 11 62 89 28 74 11 45 37 6 86 86 92 40 4 54 30 64 11 76 10 59 45 32 76 27 35 64 49 57 8 77 96 50 77 79 16 17 44 47 66 10 81 27 6 73 74 78 74 82
メソッド Intn(n) は 0 から n 未満の乱数 (int) を生成します。結果を見ればおわかりのように、NewSource に与える整数値を変えることで、異なる乱数列が生成されていることがわかります。
ちなみに、乱数を生成する関数 Int や Float64 などは、大域変数 globalRand に格納されている乱数ジェネレータを使っています。以前のバージョン (1.17.4) の場合、 globalRand は NewSource(1) に初期化されるので、Go 言語を起動して関数 Intn(100) で乱数を生成すると、乱数ジェネレータ r1 と同じ乱数列になります。
リスト : シードの変更 (sample1406.go) package main import ( "fmt" "math/rand" "time" ) func main() { for i := 0; i < 20; i++ { fmt.Print(rand.Intn(100), " ") } fmt.Println("") rand.Seed(time.Now().UnixNano()) for i := 0; i < 20; i++ { fmt.Print(rand.Intn(100), " ") } }
Go 1.17.4 での実行例を示します。
$ go run sample1406.go 81 87 47 59 81 18 25 40 56 0 94 11 62 89 28 74 11 45 37 6 17 67 37 38 17 7 42 43 59 25 22 46 6 96 73 67 24 86 38 40 $ go run sample1406.go 81 87 47 59 81 18 25 40 56 0 94 11 62 89 28 74 11 45 37 6 11 69 54 64 42 64 52 54 83 32 11 23 97 24 50 24 62 18 17 50 $ go run sample1406.go 81 87 47 59 81 18 25 40 56 0 94 11 62 89 28 74 11 45 37 6 70 10 64 93 79 44 77 7 53 61 27 55 29 79 61 54 11 36 9 61
time.Now() は現在時刻を求める関数で、UnixNano は January 1, 1970 UTC からの経過時間をナノ秒で返す関数です。Seed でシードの値を変えることで、大域的な乱数ジェネレータでも異なる乱数列を生成することができます。
現在のバージョン (1.23.2) の場合、globalRand は Go 言語を起動するたびに異なる値で初期化されるので、異なる乱数列が生成されます。Go 1.23.2 での実行例を示します。
$ go run sample1406.go 88 88 20 6 21 22 6 40 55 70 75 59 62 68 88 71 88 26 50 82 84 8 36 12 56 29 11 40 61 90 56 29 61 52 34 95 66 18 68 84 mhiroi@DESKTOP-FQK6237:~/golang$ $ go run sample1406.go 43 10 67 56 15 94 56 42 58 41 98 5 30 25 33 35 51 27 4 6 31 49 74 13 98 61 34 10 88 5 84 43 98 10 19 84 54 56 13 44 mhiroi@DESKTOP-FQK6237:~/golang$ $ go run sample1406.go 8 51 57 79 77 51 44 57 38 58 99 95 19 74 76 43 56 0 98 61 48 66 91 65 79 40 99 31 33 47 79 22 26 50 85 26 16 11 98 19
なお、globalRand は大域変数 (共有メモリ) なので、複数の goroutine からアクセスすることができます。このため、globalRand のアクセスには排他制御が行われています。並列処理で globalRand から多量の乱数を取得すると、排他制御に時間がかかるため、実行時間が遅くなる場合があります。ご注意くださいませ。
モンテカルロ法の並列処理は次のようになります。
リスト : モンテカルロ法(πを求める) package main import ( "fmt" "math/rand" "time" // "runtime" ) // モンテカルロ法 func montePi(n, s int) float64 { c := 0 r := rand.New(rand.NewSource(int64(s))) for i := n; i > 0; i-- { x := r.Float64() y := r.Float64() if x * x + y * y < 1.0 { c++ } } return (4.0 * float64(c)) / float64(n) } func main() { // runtime.GOMAXPROCS(runtime.NumCPU()) for i := 1; i <= 8; i *= 2 { fmt.Println("-----", i, "-----") n := 100000000 / i ch := make(chan float64, i) s := time.Now() for j := 0; j < i; j++ { go func(x int) { ch <- montePi(n, x) }(j + 1) } pi := 0.0 for j := i; j > 0; j-- { pi += <- ch } fmt.Println(pi / float64(i)) e := time.Now().Sub(s) fmt.Println(e) } }
試行回数を 100,000,000 とし、それを 1, 2, 4, 8 分割して実行時間を計測します。goroutine の返り値をチャネル ch で受け取り、その平均値を計算します。GOMAXPROCS の値は論理コア数の値 (4) になります。
モンテカルロ法の処理は関数 montePi で行います。引数 n が試行回数、s が乱数シードです。最初に rand.New(rand.NewSource(int64(s))) で乱数ジェネレータを生成します。次の for ループで n 個の点を打ちます。r.Float64() は [0, 1) の範囲の乱数 (float64) を生成します。点 (x, y) が円内に入っていれば c の値を +1 します。最後にπを計算して返します。
それでは実行結果を示します。
表 : 実行結果 (時間 : 秒) 実行環境 : Ubuntu 22.04 (WSL2), Go 1.23.2, Intel Core i5-6200U 2.30GHz N : 値 : 時間 : 効果 ---+------------+-------+------ 1 : 3.14151812 : 0.974 : 1.00 2 : 3.14163576 : 0.594 : 1.64 4 : 3.1414764 : 0.485 : 2.01 8 : 3.14167352 : 0.487 : 2.00
4 分割で 2 倍の高速化に成功しました。
物理コア数が 4、ハイパースレッド *2 で、論理コア数は 8 になります。
表 : 実行結果 (時間 : 秒) 実行環境 : Windows 7, Go 1.2, Core i7-2670QM 2.20GHz N : 値 : 時間 : 効果 ---+------------+-------+------ 1 : 3.14151812 : 5.967 : 1.00 2 : 3.14163576 : 3.109 : 1.92 4 : 3.1414764 : 1.710 : 3.49 8 : 3.14167352 : 1.027 : 5.81
8 分割で 5.8 倍の高速化に成功しました。モンテカルロ法の場合も、並列処理の効果はとても高いことがわかります。
次はクイックソートの並列化に挑戦してみましょう。クイックソートは枢軸を基準に区間を二分割し、分割した区間に再度クイックソートを適用します。分割した区間をソートするとき、他の区間には影響を及ぼさないので、二分割した区間を並列にソートすることが可能です。
この場合、多数の goroutine が生成されて、ソート完了を通知するため頻繁に通信を行うことになります。このように通信の頻度が多くなるものを「細粒度並列性 (fine-graind parallelism)」といいます。これに対し、数値積分やモンテカルロ法の関数 montePi の実行では通信を 1 回しか行いません。通信の頻度が少ないものを「粗粒度並列性 (coarse-grained parallelism)」といいます。粒度が細かくなると通信の頻度が多くなり、通信処理のオーバーヘッドにより効率は低下します。
そこで、区間の長さがある値以下になったら、逐次処理のクイックソートに切り替えることにします。なお、この処理を入れずにすべて並列処理で行うと、Go 言語ではランタイムエラーが発生します。ご注意くださいませ。
プログラムは次のようになります。
リスト : クイックソートの並列化 func quickSortParaSub(data SortI, low, high int) { if high - low < 1024 { quickSortSub(data, low, high) return } p := low + (high - low) / 2 i, j := low, high for { for data.Less(i, p) { i++ } for data.Less(p, j) { j-- } if i >= j { break } data.Swap(i, j) switch { case p == i: p = j case p == j: p = i } i++ j-- } ch := make(chan int, 2) go func(){ quickSortParaSub(data, low, i - 1) ch <- 0 }() go func(){ quickSortParaSub(data, j + 1, high) ch <- 0 }() <- ch <- ch } func quickSortPara(data SortI) { quickSortParaSub(data, 0, data.Len() - 1) }
quickSortPara は関数 quickSortParaSub を呼び出します。基本的な処理は拙作のページ「インターフェース」で作成した quickSortSub と同じです。最初に区間の長さをチェックして、1024 未満になったら逐次処理のクイックソート quickSortSub に切り替えます。この値は実行環境によって最適値が変わると思います。興味のある方はご自分の環境でいろいろ試してみてください。
枢軸を基準に区間を二分割したあと、go で匿名関数を並列に実行し、この中で quickSortParaSub を呼び出します。ソートが終了したらチャネル ch に 0 を送信します。あとはチャネル ch からデータを 2 回受信するまで待つだけです。
あとのプログラムは簡単なので説明は省略します。詳細はプログラムリストをお読みください。
実行結果は次のようになりました。
表 : 実行結果 (単位 : 秒) 実行環境 : Ubuntu 22.04 (WSL2), Go 1.23.2, Intel Core i5-6200U 2.30GHz データ数 : 逐次 : 並列 : 効率 ----------+-------+-------+------ 1,000,000 : 0.214 : 0.095 : 2.25 2,000,000 : 0.450 : 0.199 : 2.26 4,000,000 : 0.929 : 0.406 : 2.29 8,000,000 : 1.912 : 0.858 : 2.29
逐次処理よりも約 2 倍の高速化に成功しました。並列処理の効果は十分に出ていると思います。
物理コア数が 4、ハイパースレッド *2 で、論理コア数は 8 になります。
表 : 実行結果 (単位 : 秒) 実行環境 : Windows 7, Go 1.2, Core i7-2670QM 2.20GHz データ数 : 逐次 : 並列 : 効率 ----------+-------+-------+------ 1,000,000 : 0.361 : 0.109 : 3.31 2,000,000 : 0.740 : 0.215 : 3.44 4,000,000 : 1.520 : 0.419 : 3.63 8,000,000 : 3.153 : 0.849 : 3.71
逐次処理よりも約 3 倍強の高速化に成功しました。粒度が細かい分だけ数値積分やモンテカルロ法よりも効率は悪くなりますが、並列処理の効果は十分に出ていると思います。
次は順列の生成を並列処理で行ってみましょう。今回は 1 から n までの数字から n 個の数字を選ぶ順列を生成することにします。処理の分割は簡単で、先頭要素の数字で処理を分けるだけです。ようするに、1 から始まる順列、2 から始まる順列、...、n から始まる順列を別々の goroutine で並列に生成するわけです。
プログラムは次のようになります。
リスト : 順列の並列化 package main import ( "fmt" "time" // "runtime" ) // n と等しい要素があるか func member(n int, xs []int) bool { for _, x := range xs { if n == x { return true } } return false } // 順列の生成 func permutation(f func([]int), n, m, k int, xs []int) { if len(xs) == k { f(xs) } else { for i := n; i <= m; i++ { if !member(i, xs) { permutation(f, n, m, k, append(xs, i)) } } } } func main() { // runtime.GOMAXPROCS(runtime.NumCPU()) for i := 10; i <= 12; i++ { fmt.Println("-----", i, "-----") s := time.Now() // 逐次処理 func(){ c := 0 a := make([]int, 0, i) permutation(func(_ []int){ c++ }, 1, i, i, a) fmt.Println(c) }() e := time.Now().Sub(s) fmt.Println(e) s = time.Now() ch := make(chan int, i) // 並列処理 for j := 0; j < i; j++ { go func(i, j int){ c := 0 a := make([]int, 0, i) a = append(a, j + 1) permutation(func(_ []int){ c++ }, 1, i, i, a) ch <- c }(i, j) } sum := 0 for j := 0; j < i; j++ { sum += <- ch } fmt.Println(sum) e = time.Now().Sub(s) fmt.Println(e) } }
関数 permutation は n から m までの数字から k 個の数字を選ぶ順列を生成します。引数 xs が順列を格納する累積変数で、permutation を呼び出すとき、xs の先頭要素に数字を格納して渡せば、その数字から始まる順列を生成することができます。permutation に渡す匿名関数は、生成した順列をカウントするだけです。あとは、make でスライスを生成するとき、容量を指定することに注意してください。スライスの容量を動的に増加させると実行速度が遅くなってしまいます。
実行結果は次のようになりました。
表 : 実行結果 実行環境 : Ubuntu 22.04 (WSL2), Go 1.23.2, Intel Core i5-6200U 2.30GHz N : 逐次 : 並列 : 効率 ---+------------------+------ 10 : 0.652 : 0.275 : 2.27 11 : 7.798 : 3.203 : 2.43 12 : 102.089 : 41.680 : 2.44
2 倍ちょっとの高速化に成功しました。
物理コア数が 4、ハイパースレッド *2 で、論理コア数は 8 になります。
表 : 実行結果 実行環境 : Windows 7, Go 1.2, Core i7-2670QM 2.20GHz N : 逐次 : 並列 : 効率 ---+------------------+------ 10 : 0.944 : 0.235 : 4.02 11 : 11.518 : 2.374 : 4.85 12 : 154.397 : 34.520 : 4.47
4 倍以上の高速化に成功しました。並列化の効果は十分に出ていると思いますが、起動する goroutine の個数が NumCPU の個数よりも多くなるので、数値積分やモンテカルロ法よりも効率が悪くなるのは仕方がないところです。順列の生成はパズルの解法でよく使うので、並列処理はパズルの解法にも有効だと思われます。興味のある方はいろいろ試してみてください。
リスト : クイックソートの並列化 package main import ( "fmt" "math/rand" "time" // "runtime" ) type SortI interface{ Len() int Less(int, int) bool Swap(int, int) } func quickSortSub(data SortI, low, high int) { p := low + (high - low) / 2 i, j := low, high for { for data.Less(i, p) { i++ } for data.Less(p, j) { j-- } if i >= j { break } data.Swap(i, j) switch { case p == i: p = j case p == j: p = i } i++ j-- } if low < i - 1 { quickSortSub(data, low, i - 1) } if high > j + 1 { quickSortSub(data, j + 1, high) } } func quickSort(data SortI) { quickSortSub(data, 0, data.Len() - 1) } func quickSortParaSub(data SortI, low, high int) { if high - low < 1024 { quickSortSub(data, low, high) return } p := low + (high - low) / 2 i, j := low, high for { for data.Less(i, p) { i++ } for data.Less(p, j) { j-- } if i >= j { break } data.Swap(i, j) switch { case p == i: p = j case p == j: p = i } i++ j-- } ch := make(chan int, 2) go func(){ quickSortParaSub(data, low, i - 1) ch <- 0 }() go func(){ quickSortParaSub(data, j + 1, high) ch <- 0 }() <- ch <- ch } func quickSortPara(data SortI) { quickSortParaSub(data, 0, data.Len() - 1) } type IntArray []int func (ary IntArray) Len() int { return len(ary) } func (ary IntArray) Less(i, j int) bool { return ary[i] < ary[j] } func (ary IntArray) Swap(i, j int) { ary[i], ary[j] = ary[j], ary[i] } func main() { // runtime.GOMAXPROCS(runtime.NumCPU()) for i := 1; i <= 8; i *= 2 { a := make(IntArray, i * 1000000) b := make(IntArray, i * 1000000) for j := 0; j < i * 1000000; j++ { x := rand.Int() a[j] = x b[j] = x } fmt.Println("-----", i, "-----") s := time.Now() quickSort(a) e := time.Now().Sub(s) fmt.Println(e) s = time.Now() quickSortPara(b) e = time.Now().Sub(s) fmt.Println(e) } }