「並行 (concurrent) プログラミング」は複数のプログラムを並行に実行しますが、このとき複数の CPU で同時に動かす場合と、ひとつの CPU で複数のプログラムを動かす場合があります。一般的には、前者を「並列 (parallel) プログラミング」といい、複数のハードウェアを並列に実行することによる処理速度の向上が主な目的となります。
後者の場合、一定時間毎に実行するプログラムを切り替えることで、複数のプログラムを並列に実行しているかのように見せることができます。この処理を「時分割 (time sharing)」もしくは「タイム・スライス (time slice)」といいます。
一般に、タイム・スライスは OS でサポートされている機能です。OS が実行するプログラムのことを「プロセス (process)」または「タスク (task)」といいます。Julia ではコルーチンのことをタスクというので、本稿ではプロセスと記述することにします。
並列的に動作するプログラムをひとつのプロセスだけで作るのはけっこう大変です。そこで、プロセス内では逐次的な処理にとどめ、複数のプロセス間で情報交換を行うことにより、全体で並列的な動作を実現することを考えます。このほうが自然にプログラムを記述できる場合があるのです。これが後者の主な目的となります。
プロセスは互いに独立したプログラムですが、OS にはプロセス間でデータをやり取りする機能 (プロセス間通信) が用意されています。たとえば、UNIX ライクな OS では「パイプ (pipe)」を使って複数のプログラム (コマンド) を連結することができます。
この場合、パイプを通してデータがプログラムに送られ、各プログラムは並行に動作することになります。入出力処理で待たされるコマンドがあったとしても、その間に他のコマンドが実行されるので、各コマンドを逐次的に実行するよりも、効率的に処理することが可能です。
最近では、ひとつのプログラムの中で独立した複数の処理を記述できるようになりました。この機能を「スレッド (thread)」とか「マルチスレッド」いいます。スレッドは「縫い糸」という意味ですが、プログラムでは「制御の流れ」という意味で使われています。並列的な動作をプログラムする場合、逐次的な処理をひとつのスレッドに割り当て、複数のスレッドを並行に動作させることにより、全体で並列的な動作を実現するわけです。
一般的なスレッドは、一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。
コルーチンの場合、プログラムの実行は一定時間ごとに切り替わるものではなく、プログラム自身が実行を中断しないといけません。これを「ノンプリエンプティブ (nonpreemptive)」といいます。
また、スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを「共有メモリ」といいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、プリエンプティブなスレッドの場合、共有メモリのアクセス時に発生する「競合」が問題になります。このため、マルチスレッドをサポートしているプログラミング言語では、競合を回避するための仕組みが用意されています。
今回はコルーチンを使って擬似的なマルチプロセスを実現するプログラムを作ってみましょう。このプログラムはコルーチンを利用したノンプリエンプティブなものなので、競合について考慮する必要はありません。ただし、複数のプロセス間で通信を行うので、待ち合わせが必要になることがあります。この処理を「同期 (synchronization)」といいます。並行プログラミングの場合、通信と同期という 2 つの処理が重要になります。
それではプログラムを作りましょう。一般に、プロセスには優先順位 (プライオリティ) があるのですが、今回はすべてのプロセスは同じ優先順位とします。Julia の場合、タスクとスケジューラを使うと簡単にマルチプロセスを実現することができます。
関数 schedule() はタスクをスケジューラのキューに追加して、システムがアイドルなときにタスクを実行します。スケジューラーはメインのプログラムと並行に動作していると考えてください。このため、メインのプログラムが終了すると、スケジューラで実行されているタスクも終了します。ご注意ください。
簡単な実行例を示します。
リスト : 簡単なマルチプロセス function test1(name, n) while n > 0 println("$name $n") n -= 1 yield() end false end t1 = @task test1("foo", 5) t2 = @task test1("bar", 6) t3 = @task test1("baz", 4) schedule(t1) schedule(t2) schedule(t3) wait(t2)
関数 test1() は name を n 回表示します。name と n を表示したあと、yield() でタスクの実行を中断して制御をスケジューラに戻します。スケジューラはキューをチェックして実行可能なタスクがあれば、それをキューから取り出して実行します。これで他のプロセスが実行されるので、複数のプロセスを並行に動作させることができます。
一番最後の wait() は、引数がタスクの場合、それが終了するまでプログラムの実行を一時停止します。これがないとメインのプログラムが先に終了するので、スケジューラに登録したタスクは実行されません。
実行例を示します。
foo 5 bar 6 baz 4 foo 4 bar 5 baz 3 foo 3 bar 4 baz 2 foo 2 bar 3 baz 1 foo 1 bar 2 bar 1
このように、他のプロセスと通信を行わない場合、プログラムはとても簡単になります。
次はプロセス間で通信を行う場合を考えてみましょう。この場合、いろいろな方法が考えられますが、今回は Julia に用意されている「Channel (チャネル)」というデータ型を使ってみましょう。チャネルの生成はコンストラクタ Channel() を使います。
Channel{T}(size::Int) # T 型のチャネル (大きさ size) Channel(size::Int) # Any 型のチャネル (大きさ size)
チャネルの基本はキューと同じですが、データを追加する関数 put!() とデータを取り出す関数 take!() で待ち合わせを行うところが異なります。put!() では、キューが満杯のときに待ち合わせを行います。逆に take!() の場合、キューが空のときに待ち合わせを行います。キューの大きさが少ない場合でも、データを書き込むプロセスと読み出すプロセスが並行に動作することで、キューの大きさ以上のデータを受け渡すことができます。
それでは簡単な実行例を示します。次のリストを見てください。
リスト : チャネルの実行例 # チャネル const ch = Channel(4) # データを送る function send_color(color, n) for i in 1 : n put!(ch, color) end end # データを受け取る function receive_color(n) for i in 1 : n println(take!(ch)) end end # テスト schedule(@task send_color("red", 8)) schedule(@task send_color("blue", 8)) schedule(@task send_color("yellow", 8)) t1 = @task receive_color(24) schedule(t1) wait(t1)
Channel(4) でチャネルを生成して大域変数 ch に格納します。大きさは 4 とします。send_color() はデータ (color) を n 個チャネルに書き込みます。receive_color() は n 個のデータをチャネルから取り出して表示します。テストは、キューに書き込むプロセスを 3 つ生成し、取り出すプロセスを 1 つ生成します。データを書き込む総数は 8 * 3 = 24 個なので、取り出すデータ数も 24 個とします。
それでは実行結果を示します。
red red red red red blue yellow red blue yellow red blue yellow red blue yellow blue yellow blue yellow blue yellow blue yellow
24 個のデータすべて表示されていますね。
最後に、「哲学者の食事」という並行プログラミングでは有名な問題を解いてみましょう。
5 人の哲学者が丸いテーブルに座っています.テーブルの中央にはスパゲッティが盛られた大皿があり、哲学者の間には 5 本のフォークが置かれています。哲学者は思索することとスパゲッティを食べることを繰り返します。食事のときには 2 本のフォークを持たなければなりません。食事が終わると 2 本のフォークを元の位置に戻します。
詳しい説明は 食事する哲学者の問題 -- Wikipedia をお読みください。
それではプログラムを作りましょう。最初にフォークを操作する関数を定義します。
リスト : フォークを操作する関数 # フォーク const forks = [true, true, true, true, true] # フォークの番号を求める function forknum(person, side) n = person if side == :left n = n + 1 if n > length(forks) n = 1 end end n end # フォークがあるか? function isfork(person, side) forks[forknum(person, side)] end # フォークの書き換え function forkset(person, side, val) forks[forknum(person, side)] = val end # フォークを取る function getfork(person, side) # 待ち合わせ while true if isfork(person, side) break end yield() end forkset(person, side, false) sleep(0.1) end # フォークを置く function putfork(person, side) forkset(person, side, true) sleep(0.1) end
フォークの有無は真偽値で表して、それを配列に格納します。配列は大域変数 forks にセットします。n 番目の哲学者の場合、右側のフォークがベクタの n 番目の要素、左側のフォークが n + 1 番目の要素になります。関数 forknum() はフォークに対応する番号を返します。関数 isfork() は n 番目の哲学者の side にフォークがあるとき真を返します。forkset() は n 番目の哲学者の side 側にあるフォークの値を val に書き換えます。
getfork() はフォークを取る関数です。while ループで isfork() が真を返すまで待ちます。このとき、yield() を呼び出して他のプロセスに実行権を渡すことをお忘れなく。そのあとで、forkset() で対応するフォークの値を false() に書き換えます。putfork() はフォークを元に戻す関数です。forkset() でフォークの値を true に書き換えます。
getfork() と putfork() は最後に関数 sleep() でプログラムの実行を休止します。引数の単位は秒 (sec) です。ミリ秒は浮動小数点数で指定してください。なお、スケジューラに登録されたプログラムで sleep() を使用すると、実行権は他のプログラムに移ります。
今回はノンプリエンプティブなコルーチンを使用しているので、forks をアクセスするときに競合は発生しません。プリエンプティブなマルチスレッドを使用する場合、共有メモリにアクセスするときは競合について考慮する必要があります。ご注意ください。
次は哲学者の動作をプログラムします。次のリストを見てください。
リスト : 哲学者の動作 function person0(n) m = 0 while m < 2 println("Philosopher $n is thinking") sleep(0.5) getfork(n, :right) getfork(n, :left) println("Philosopher $n is eating") sleep(0.5) putfork(n, :right) putfork(n, :left) m = m + 1 end println("Philosopher $n is sleeping") end
関数 person0() の引数 n は哲学者の番号を表します。変数 m は食事をした回数です。2 回食事をしたら処理を終了します。食事をする場合、最初に getfork() で右側のフォークを取り、次に左側のフォークを取ります。食事を終えたら putfork() で右側のフォークを置き、次に左側のフォークを置きます。thinking と eating のあとは sleep() でプログラムの実行を休止します。
このように、マルチプロセスを使うと簡単にプログラムできますが、実は並行プログラミング特有の大きな問題点があるのです。これはプログラムを実行してみるとわかります。
プログラムの実行は関数 test() で行います。
リスト : 実行 function test(person) ps = map(x -> Task(() -> person(x)), [1, 2, 3, 4, 5]) for p in ps schedule(p) end for p in ps wait(p) end end test(person0)
map() でプロセスを生成し、それを schedule() でスケジューラに登録します。あとは、wait() でプロセスが終了するのを待つだけです。実行結果は次のようになります。
Philosopher 1 is thinking Philosopher 2 is thinking Philosopher 3 is thinking Philosopher 4 is thinking Philosopher 5 is thinking
このように、すべてのプロセスが待ち状態となり進むことができなくなります。これを「デッドロック (deadlock)」といいます。この場合、1 番目の哲学者の右側のフォークは、5 番目の哲学者の左側のフォークになります。各哲学者が右側のフォークを取り、左側のフォークが置かれるのを待つときにデッドロックとなるわけです。
デッドロックを防止する簡単な方法は、右側のフォークを取っても左側のフォークを取れないときは、右側のフォークを元に戻すことです。プログラムは次のようになります。
リスト : デッドロックの防止 (1) function person1(n) m = 0 while m < 2 println("Philosopher $n is thinking") sleep(0.5) getfork(n, :right) if isfork(n, :left) getfork(n, :left) println("Philosopher $n is eating") sleep(0.5) putfork(n, :right) putfork(n, :left) m = m + 1 else putfork(n, :right) end end println("Philosopher $n is sleeping") end
右側のフォークを取ったあと、isfork() で左側のフォークをチェックします。フォークがある場合は、forkset() で値を true に書き換えます。これでフォークを取って食事をすることができます。左側のフォークがない場合は putfork() で右側のフォークを元に戻します。
実行結果は次のようになります。
Philosopher 1 is thinking Philosopher 2 is thinking Philosopher 3 is thinking Philosopher 4 is thinking Philosopher 5 is thinking Philosopher 1 is thinking Philosopher 2 is thinking Philosopher 3 is thinking Philosopher 4 is thinking Philosopher 5 is eating Philosopher 2 is thinking Philosopher 3 is thinking Philosopher 4 is eating Philosopher 5 is thinking Philosopher 1 is eating Philosopher 3 is eating Philosopher 4 is thinking Philosopher 5 is eating Philosopher 1 is thinking Philosopher 2 is thinking Philosopher 3 is thinking Philosopher 4 is eating Philosopher 5 is sleeping Philosopher 1 is thinking Philosopher 2 is eating Philosopher 4 is sleeping Philosopher 1 is eating Philosopher 2 is thinking Philosopher 3 is eating Philosopher 1 is sleeping Philosopher 2 is eating Philosopher 3 is sleeping Philosopher 2 is sleeping
このように、デッドロックしないで実行することができます。
もうひとつ簡単な方法を紹介しましょう。奇数番目の哲学者は、まず左側のフォークを取り上げてから右側のフォークを取り、偶数番目の哲学者は、今までのように右側のフォークを取り上げてから左側のフォークを取ります。こんな簡単な方法で動作するのは不思議なように思います。たとえば、哲学者が 2 人の場合を考えてみてください。
哲学者 0 の右側のフォークを A、左側のフォークを B とします。哲学者 1 からみると、B が右側のフォークで、A が左側のフォークになります。デッドロックは、哲学者 0 が A を取り、哲学者 1 が B を取ったときに発生します。ここで、哲学者 1 が左側のフォーク A から取るようにします。先に哲学者 0 が A を取った場合、哲学者 1 は A があくまで待つことになるので、哲学者 0 はフォーク B を取って食事をすることができます。哲学者 1 が先にフォーク A を取った場合も同じです。これでデッドロックを防止することができます。
プログラムは次のようになります。
リスト : デッドロックの防止 (2) function person2(n) m = 0 while m < 2 println("Philosopher $n is thinking") sleep(0.5) if n % 2 == 0 getfork(n, :right) getfork(n, :left) else getfork(n, :left) getfork(n, :right) end println("Philosopher $n is eating") sleep(0.5) putfork(n, :right) putfork(n, :left) m = m + 1 end println("Philosopher $n is sleeping") end
if 文で n が偶数の場合は右側から、奇数の場合は左側のフォークから取るように処理を分けるだけです。
実行結果は次のようになります。
Philosopher 1 is thinking Philosopher 2 is thinking Philosopher 3 is thinking Philosopher 4 is thinking Philosopher 5 is thinking Philosopher 3 is eating Philosopher 5 is eating Philosopher 3 is thinking Philosopher 5 is thinking Philosopher 1 is eating Philosopher 4 is eating Philosopher 1 is thinking Philosopher 4 is thinking Philosopher 3 is eating Philosopher 5 is eating Philosopher 2 is eating Philosopher 3 is sleeping Philosopher 5 is sleeping Philosopher 4 is eating Philosopher 2 is thinking Philosopher 1 is eating Philosopher 4 is sleeping Philosopher 1 is sleeping Philosopher 2 is eating Philosopher 2 is sleeping
正常に動作していますね。興味のある方はいろいろ試してみてください。
今度は Julia で「並列 (parallel) プログラミング」に挑戦してみましょう。なお、M.Hiroi は Julia もそうですが並列プログラミングに関しても初心者にすぎません。なにぶんにも初心者が作るプログラムなので、何か問題点や間違いがあるかもしれません。お気づきの点がありましたら、ご指摘いただけると助かります。
なお、並列プログラミングの基本は拙作のページ「Julia の基礎知識: 並列プログラミング」で簡単に説明していますので、そちらをお読みくださいませ。
乱数を使って数学や物理などの問題を解くアルゴリズムを「モンテカルロ法 (Monte Carlo methods)」といいます。簡単な例題として、円周率πをモンテカルロ法で求めてみましょう。
正方形の領域 (0 <= x < 1, 0 <= y < 1) に乱数で点を打ちます。乱数であれば点は領域内に一様に分布するので、x2 + y2 < 1 の円内に入る確率は π / 4 になります。つまり、(円内の点の個数 / 点の総数) の値は 0.7853... になるはずです。たくさん点を打つほど値は π / 4 に近づくはずですが、コンピュータの乱数は疑似乱数なので規則性が生じてしまい、値の精度にはどうしても限界があります。
また、たくさん点を打つと実行時間がかかるようになりますが、並列に処理することで実行時間を短縮することができます。たとえば、100 万個の点を打ってπを求める処理を並列に 4 回行って平均値を計算すれば、400 万個の点を打ってπを求めたことと同じになります。
Julia の場合、関数 rand() で乱数を取得することができます。Julia の rand() は高機能です。
rand([rng][, S][, dims...])
rng は乱数発生ジェネレータです。省略すると大域変数に格納されたジェネレータが使用されます。S はデータ型やコレクション (['a', 'b', 'c'] とか 1 : 10 など) を指定します。データ型 S が整数の場合、乱数の範囲は [typemin(S), typemax(S)] になります。浮動小数点数の場合は [0, 1.0) (0 以上 1.0 未満) の範囲になります。dims を指定すると、dims で指定した大きさの配列に乱数を格納して返します。
簡単な例を示しましょう。
julia> for _ = 1 : 10; println(rand()); end 0.5939723464136937 0.024220850690386264 0.648525189990292 0.22083957896326245 0.1309902585960312 0.47506648591383205 0.41566489255438244 0.3515387127792815 0.9575547284952766 0.7399959494399366 julia> rand(3, 3) 3×3 Matrix{Float64}: 0.527818 0.643676 0.54425 0.839322 0.285294 0.957029 0.711791 0.590617 0.504498 julia> for _ = 1 : 10; println(rand(1 : 100)); end 77 5 29 22 70 57 44 2 54 15 julia> rand(1 : 100, 4, 4) 4×4 Matrix{Int64}: 92 16 62 95 94 100 17 66 58 9 52 76 79 49 42 33
乱数発生ジェネレータは Random.MersenneTwister() で生成することができます。乱数を生成するアルゴリズムは名前が表すようにメルセンヌ・ツイスターが使われています。
MersenneTwister([seed])
一般に、乱数の元になるデータを「シード (seed : 種)」といいます。乱数ジェネレータを生成する seed の値を変えることで、異なる乱数列を発生させることができます。簡単な例を示しましょう。
julia> using Random julia> r0 = MersenneTwister(0) MersenneTwister(0) julia> rand(r0, 10) 10-element Vector{Float64}: 0.8236475079774124 0.9103565379264364 0.16456579813368521 0.17732884646626457 0.278880109331201 0.20347655804192266 0.042301665932029664 0.06826925550564478 0.3618283907762174 0.9732164043865108 julia> r1 = MersenneTwister(1) MersenneTwister(1) julia> rand(r1, 10) 10-element Vector{Float64}: 0.23603334566204692 0.34651701419196046 0.3127069683360675 0.00790928339056074 0.4886128300795012 0.21096820215853596 0.951916339835734 0.9999046588986136 0.25166218303197185 0.9866663668987996 julia> r = MersenneTwister() MersenneTwister(0x91f74efb9f8c7d42666b19182e6be477) julia> rand(r, 10) 10-element Vector{Float64}: 0.3086901529652917 0.6615834335922925 0.8714187982961843 0.8418738079768595 0.3248667496253246 0.4861284029002404 0.6494751066055828 0.028181345773853117 0.6482128164123016 0.22540065700091594
seed を省略すると、Julia が何らかの方法で seed を生成するようです。
Julia の場合、大域変数は各ワーカープロセスごとに用意されます。大域的な乱数ジェネレータも同じです。シードを関数 srand(seed) で設定すれば、ワーカープロセスごとに異なる乱数列を発生することができますが、今回は乱数の例題もかねて、プロセスごとにジェネレータを生成することにします。
プログラムは次のようになります。Julia を起動するときはオプション -p 数値 をお忘れなく。今回は -p 4 を指定して起動しました。
リスト : モンテカルロ法 @everywhere using Random @everywhere function monte_pi(n, s) rng = MersenneTwister(s) c = 0 for _ = 1 : n x = rand(rng) y = rand(rng) if x * x + y * y < 1.0 c += 1 end end 4.0 * c / n end function test_monte_pi(n) for i in [1, 2, 4] println("---- $i ----") m = div(n, i) p = @time @distributed (+) for s in 1 : i monte_pi(m, s) end println(p / i) end end
julia> test_monte_pi(100000000) ---- 1 ---- 2.020121 seconds (535.31 k allocations: 35.862 MiB, 37.65% compilation time) 3.14161456 ---- 2 ---- 0.952704 seconds (845 allocations: 57.320 KiB, 1.27% compilation time) 3.1416066799999998 ---- 4 ---- 1.016437 seconds (595 allocations: 35.719 KiB) 3.14147464 実行環境 : Julia ver 1.10.5, Ubuntu 22.04 (WSL2), Intel Core i5-6200U 2.30GHz
2 分割で約 2 倍の高速化を達成できました。M.Hiroi のパソコンの CPU (Intel Core i5-6200U 2.30GHz) の物理コア数は 2 で、1 コアにつきハイパースレッディングで 2 分割することができます。Julia の場合、物理コア数を超えると並列化の効果は少なくなるようです。興味のある方はいろ試してみてください。
リスト : 哲学者の食事 # フォーク const forks = [true, true, true, true, true] # フォークの番号を求める function forknum(person, side) n = person if side == :left n = n + 1 if n > length(forks) n = 1 end end n end # フォークがあるか? function isfork(person, side) forks[forknum(person, side)] end # フォークの書き換え function forkset(person, side, val) forks[forknum(person, side)] = val end # フォークを取る function getfork(person, side) # 待ち合わせ while true if isfork(person, side) break end yield() end forkset(person, side, false) sleep(0.1) end # フォークを置く function putfork(person, side) forkset(person, side, true) sleep(0.1) end # 哲学者の動作 function person0(n) m = 0 while m < 2 println("Philosopher $n is thinking") sleep(0.5) getfork(n, :right) getfork(n, :left) println("Philosopher $n is eating") sleep(0.5) putfork(n, :right) putfork(n, :left) m = m + 1 end println("Philosopher $n is sleeping") end function person1(n) m = 0 while m < 2 println("Philosopher $n is thinking") sleep(0.5) getfork(n, :right) if isfork(n, :left) getfork(n, :left) println("Philosopher $n is eating") sleep(0.5) putfork(n, :right) putfork(n, :left) m = m + 1 else putfork(n, :right) end end println("Philosopher $n is sleeping") end function person2(n) m = 0 while m < 2 println("Philosopher $n is thinking") sleep(0.5) if n % 2 == 0 getfork(n, :right) getfork(n, :left) else getfork(n, :left) getfork(n, :right) end println("Philosopher $n is eating") sleep(0.5) putfork(n, :right) putfork(n, :left) m = m + 1 end println("Philosopher $n is sleeping") end function test(person) ps = map(x -> Task(() -> person(x)), [1, 2, 3, 4, 5]) for p in ps schedule(p) end for p in ps wait(p) end end #test(person0) #test(person1) test(person2)