並行プログラミングの続きです。今回は簡単な例題として「エラトステネスの篩」と「哲学者の食事」を取り上げます。
go ブロックとチャネルを使って素数を求めるプログラムを作ってみましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するチャネルを用意します。この場合、チャネルは「遅延ストリーム」のように機能 [*1] します。ここではチャネルのことを「ストリーム」と呼ぶことにしましょう。
最初に、2 から始まる整数列を生成するストリームを作ります。これは go ブロックとチャネルを使えば簡単に作成することができます。2 は素数なので、この整数列から 2 で割り切れる整数を取り除き除きます。ここでも go ブロックを使って、入力ストリームから 2 で割り切れる整数を取り除いたストリームを返すフィルターを作ります。
2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これもフィルターを使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くようにフィルターを設定すればいいわけです。
このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番にフィルターで設定して素数でない整数をふるい落としていくわけです。
次はストリーム用の高階関数 stream-filter を定義します。
リスト : ストリーム用のフィルター (sieve.clj) (require '[clojure.core.async :refer [chan close! go-loop <!! <! >!] :as a]) ;; 整数列の生成 (defn make-ints [low high] (let [ch (chan)] (go-loop [low low] (if (<= low high) (do (>! ch low) (recur (inc low))) (close! ch))) ch)) ;; フィルター (defn stream-filter [pred ch] (let [och (chan)] (go-loop [] (let [item (<! ch)] (if item (do (when (pred item) (>! och item)) (recur)) (close! och)))) och))
関数 stream-filter はストリーム ch の要素に述語 pred を適用し、真を返す要素をストリーム och に書き込みます。ch がクローズして nil を返したら、och をクローズして go-loop を終了します。あとは特に難しいところはないと思います。
それでは実際に試してみましょう。
user=> (load-file "sieve.clj") ... 略 ... user=> (<!! (a/into [] (make-ints 1 10))) [1 2 3 4 5 6 7 8 9 10] user=> (<!! (a/into [] (stream-filter even? (make-ints 1 10)))) [2 4 6 8 10] user=> (<!! (a/into [] (stream-filter odd? (make-ints 1 10)))) [1 3 5 7 9]
最後に素数を求める関数 stream-sieve を作ります。
リスト : エラトステネスの篩 (defn stream-sieve [n] (let [och (chan)] (go-loop [ich (make-ints 2 n)] (let [x (<! ich)] (if x (do (>! och x) (if (<= (* x x) n) (recur (stream-filter #(not (zero? (mod % x))) ich)) (recur ich))) (close! och)))) och))
関数 stream-sieve は n 以下の素数を生成するストリームを返します。最初に、素数を生成するストリーム och を chan で作ります。次に、go-loop の中で、2 から n まで生成するストリームを make-ints で作って変数 ich にセットします。ループ本体で ich から素数を取り出してストリーム och に書き込みます。
要素 x が √n より大きければ、残りの要素はすべて素数です。そうでなければ、x と割り切れる要素を取り除くフィルターを設定します。この処理を関数 stream-filter で行います。そして、ストリーム ich の値を stream-filter の返り値 (ストリーム) で更新します。これで ich から要素を読み込むと、stream-filter の中で書き換える前の in からデータが読み込まれ、x で割り切れる要素を取り除くことができます。
それでは実行してみましょう。
user=> (<!! (a/into [] (stream-sieve 100))) [2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97] user=> (<!! (a/into [] (stream-sieve 1000))) [2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499 503 509 521 523 541 547 557 563 569 571 577 587 593 599 601 607 613 617 619 631 641 643 647 653 659 661 673 677 683 691 701 709 719 727 733 739 743 751 757 761 769 773 787 797 809 811 821 823 827 829 839 853 857 859 863 877 881 883 887 907 911 919 929 937 941 947 953 967 971 977 983 991 997]
正常に動作していますね。
次は「哲学者の食事」という並行プログラミングでは有名な問題を解いてみましょう。
5 人の哲学者が丸いテーブルに座っています。テーブルの中央にはスパゲッティが盛られた大皿があり、哲学者の間には 5 本のフォークが置かれています。哲学者は思索することとスパゲッティを食べることを繰り返します。食事のときには 2 本のフォークを持たなければなりません。食事が終わると 2 本のフォークを元の位置に戻します。
詳しい説明は 食事する哲学者の問題 -- Wikipedia をお読みください。
それではプログラムを作りましょう。5 人の哲学者を 5 つの go ブロックで表します。それから、フォークを管理する go ブロックを 1 つ作ります。これをサーバーとして使います。哲学者はサーバーにフォークを要求します。サーバーは要求されたフォークがあれば使用を許可し、なければ不許可とします。哲学者はフォークがなければ一定時間待機して、再度フォークを要求することにしましょう。
まず最初に、必要となるデータ構造を定義します。
リスト : フォークのリクエスト ;; req :get, :ret ;; fork 0 - 4 ;; rpley 応答用チャネル (defrecord Req [req fork reply])
レコード Req はフォークのリクエストを表します。フィールド変数 req にリクエストの種類、fork にフォークの番号、reply は応答用のチャネルです。リクエストの種類はキーワード :get と :ret で表します。 :get はフォークの取得、:ret はフォークの返却を表します。
次はフォークを管理するサーバー forks を作ります。
リスト : フォークの管理 (defn forks [n sch] (let [fork-table (for [_ (range n)] (atom true))] (go-loop [] (let [{:keys [req fork reply]} (<! sch)] (cond (= req :get) (if @(nth fork-table fork) (do (swap! (nth fork-table fork) not) (>! reply true)) (>! reply false)) (= req :ret) (do (swap! (nth fork-table fork) not) (>! reply true))) (recur)))))
引数 n はフォークの本数、sch はリクエストを受け付けるチャネルです。最初にフォークの有無を表す配列 fork-table を作って (atom true) で初期化します。それから、次の go-loop (無限ループ) でリクエストを受け付けます。チャネル sch からリクエストを取り出して、分配束縛を使って各変数にセットします。
req が :get ならば、fork が fork-table にあるか @(nth fork-table fork) をチェックします。値が true であれば fork があるので、swap! で false に書き換えてから、チャネル reply に true を送信します。フォークがなければ false を送信します。req が :ret の場合はフォークを返却するだけなので、(nth fork-table fork) を true に書き換えてから、reply に true を送信します。
次はフォークを取得する関数 get-fork を作ります。
リスト : フォークの取得 (defn get-fork [fork sch] (let [ich (chan) req (->Req :get fork ich)] (go-loop [] (>! sch req) (if (<! ich) (do (<! (timeout 100)) fork) (do (<! (timeout 500)) (recur))))))
関数 get-fork はフォーク fork をサーバーに要求します。->Req でリクエストを生成して変数 req にセットします。そして、go-loop 中で req をチャネル sch に送信し、その応答をチャネル ich で受け取ります。true の場合はフォークの使用許可がおりたので、準備時間として 100 msec 待ったあと fork を返します。不許可の場合は 500 msec 待ったあと再度メッセージを送信します。
次はフォークを返却する関数 ret-fork を作ります。
リスト : フォークの返却 (defn ret-fork [fork sch] (let [ich (chan)] (go (<! (timeout 100)) (>! sch (->Req :ret fork ich)) (<! ich))))
ret-fork は fork を返す準備時間として 100 msec 待ったあと、チャネル sch にフォークを返却するリクエストを送信します。そして、チャネル ich からの応答結果 (true) を返します。
次は哲学者の動作をプログラムします。次のリストを見てください。
リスト : 哲学者の動作 (defn person [m rfork lfork sch quit] (go-loop [n 2] (if (zero? n) (do (printf "Philosopher%d is sleeping\n" m) (flush) (>! quit true)) (do (printf "Philosopher%d is thinking\n" m) (flush) (<! (timeout 1000)) (<! (get-fork rfork sch)) (<! (get-fork lfork sch)) (printf "Philosopher%d is eating\n" m) (flush) (<! (timeout 500)) (<! (ret-fork rfork sch)) (<! (ret-fork lfork sch)) (recur (dec n))))))
関数 person の引数 m は哲学者の番号を表します。rfork が右側のフォーク、lfork が左側のフォークです。次の go-loop で、哲学者が食事を取る回数だけ処理を繰り返します。哲学者が食事をする場合、最初に get-fork で右側のフォークを取り、次に左側のフォークを取ります。食事を終えたら ret-fork で右側のフォークを返却し、次に左側のフォークを返却します。get-fork と ret-fork を実行するときは、<! を使って結果を受け取ります。これで動作が終了するまでパークすることができます。
このように、go ブロックを使うと簡単にプログラムできますが、実は並行プログラミング特有の大きな問題点があるのです。これはプログラムを実行してみるとわかります。
最後にプログラムを実行する関数 main を作ります。
リスト : 実行 (defn -main [] (let [sch (chan) quit (chan)] (forks 5 sch) (person 1 0 1 sch quit) (person 2 1 2 sch quit) (person 3 2 3 sch quit) (person 4 3 4 sch quit) (person 5 4 0 sch quit) (dotimes [_ 5] (<!! quit)))) }
最初に、リクエストを受け付けるチャネル sch と、終了通知を受け付けるチャネル quit を生成します。それから (forks 5 csh) でサーバーを起動して、関数 person で 5 人の哲学者を起動します。フォークは整数 0, 1, 2, 3, 4 で表しています。哲学者は円形に並んでいるので、5 人目の左側のフォークが 1 人目の右側のフォークになります。あとは、最後の dotimes で 5 人の哲学者が終了するのを待つだけです。
実行結果は次のようになります。
$ clj -M -m ph0 Philosopher1 is thinking Philosopher4 is thinking Philosopher5 is thinking Philosopher2 is thinking Philosopher3 is thinking <-- CTRL-C を入力
このように、すべての哲学者 (go ブロック) が待ち状態となり先へ進むことができなくなります。これを「デッドロック (deadlock)」といいます。哲学者全員が右側のフォークを取り、左側のフォークが置かれるのを待つときにデッドロックとなるわけです。
デッドロックを防止する簡単な方法は、右側のフォークを取っても左側のフォークを取れないときは、右側のフォークを元に戻すことです。プログラムは次のようになります。
リスト : デッドロックの防止 (1) ;; 左側のフォークを要求する (defn get-fork1 [fork sch] (let [ich (chan)] (go (>! sch (->Req :get fork ich)) (<! ich)))) ;; 哲学者 (defn person [m rfork lfork sch quit] (go-loop [n 2] (if (zero? n) (do (printf "Philosopher%d is sleeping\n" m) (flush) (>! quit true)) (do (printf "Philosopher%d is thinking\n" m) (flush) (<! (timeout 1000)) (<! (get-fork rfork sch)) (if (<! (get-fork1 lfork sch)) (do (printf "Philosopher%d is eating\n" m) (flush) (<! (timeout 500)) (<! (ret-fork rfork sch)) (<! (ret-fork lfork sch)) (recur (dec n))) (do (ret-fork rfork sch) (recur n)))))))
右側のフォークを取ったあと、関数 get-fork1 で左側のフォークを要求します。フォークを受け取った場合は true を返すので、食事をすることができます。false の場合は右側のフォークを返却して思索に戻ります。
Lua のようなノンプリエンプティブなコールチンの場合、これでデッドロックを解消して正常に動作するのですが、core.async の go ブロックやプリエンプティブなスレッドでは新たな問題が発生します。
実行結果は次のようになります。
$ clj -M-m ph1 Philosopher1 is thinking Philosopher2 is thinking Philosopher3 is thinking Philosopher4 is thinking Philosopher5 is thinking Philosopher1 is thinking Philosopher2 is thinking Philosopher3 is thinking Philosopher4 is thinking Philosopher5 is thinking Philosopher1 is thinking Philosopher3 is thinking Philosopher4 is thinking Philosopher2 is thinking Philosopher5 is thinking <-- CTRL-C 入力
哲学者全員が右側のフォークを受け取っては返却することを繰り返すため、次の状態へ進むことができません。デッドロックではありませんが、無限ループに陥っているわけです。このような状態を「ライブロック (livelock)」といいます。
「哲学者の食事」の場合、ライブロックを解消する簡単な方法があります。フォークが残り 1 本の場合、右側のフォークを要求されたらそれを待たせることにするのです。左側のフォークであれば、その要求を受け付けます。4 人の哲学者が右側のフォークを持ったとき、5 人目の哲学者は右側のフォークを持つことができません。次に、4 人のうちの誰かが左側のフォークを要求し、それが受け付けられるので、最低でもひとりの哲学者が食事をすることができます。
プログラムは次のようになります。
リスト : レコードの定義 ;; req :get, :ret ;; fork 0 - 4 ;; side :left, :right ;; rpley 返信用チャネル (defrecord Req [req fork side reply])
レコード Req にフィールド変数 side を追加します。右側のフォークであればキーワード :right を、左側のフォークであれば :left をセットします。
次に関数 forks を修正します。
リスト : フォークの管理 (defn forks [n sch] (let [fork-table (for [_ (range n)] (atom true)) fork-num (atom n)] (go-loop [] (let [{:keys [req fork side reply]} (<! sch)] (cond (= req :get) (if @(nth fork-table fork) (if (and (== @fork-num 1) (= side :right)) (>! reply false) (do (swap! (nth fork-table fork) not) (swap! fork-num dec) (>! reply true))) (>! reply false)) (= req :ret) (do (swap! (nth fork-table fork) not) (swap! fork-num inc) (>! reply true))) (recur)))))
フォークの残数を変数 fork-num で管理します。フォークの使用を許可するとき、@fork-numn が 1 で side が :right の場合は許可しません。それ以外の場合はフォークの使用を許可します。フォークの使用を許可したときは fork-num の値を -1 して、フォークが返却された場合は fork-num の値を +1 することをお忘れなく。
次はフォークを取得する関数 get-fork を修正します。
リスト : フォークの取得 (defn get-fork [fork side sch] (let [ich (chan) req (->Req :get fork side ich)] (go-loop [] (>! sch req) (cond (<! ich) (do (<! (timeout 100)) fork) (= side :left) false :else (do (<! (timeout 500)) (recur))))))
チャネル ich からの応答が false の場合、それが左側のフォークであれば false を返します。それ以外の処理は今までと同じです。
最後に関数 person を修正します。
リスト : ライブロックの解消 (defn person [m rfork lfork sch quit] (go-loop [n 2] (if (zero? n) (do (printf "Philosopher%d is sleeping\n" m) (flush) (>! quit true)) (do (printf "Philosopher%d is thinking\n" m) (flush) (<! (timeout 1000)) (<! (get-fork rfork :right sch)) (if (<! (get-fork lfork :left sch)) (do (printf "Philosopher%d is eating\n" m) (flush) (<! (timeout 500)) (<! (ret-fork rfork :rigth sch)) (<! (ret-fork lfork :left sch)) (recur (dec n))) (do (ret-fork rfork :right sch) (recur n)))))))
get-fork で左側のフォークを要求し、応答が false の場合は retFork で右側のフォークを返却します。あとの処理は今までと同じです。
それでは実行してみましょう。
$ clj -M -m ph2 Philosopher1 is thinking Philosopher2 is thinking Philosopher4 is thinking Philosopher3 is thinking Philosopher5 is thinking Philosopher5 is thinking Philosopher4 is thinking Philosopher3 is thinking Philosopher1 is eating Philosopher1 is thinking Philosopher2 is eating Philosopher4 is thinking Philosopher5 is eating Philosopher2 is thinking Philosopher5 is thinking Philosopher3 is thinking Philosopher4 is eating Philosopher1 is eating Philosopher4 is thinking Philosopher1 is sleeping Philosopher3 is eating Philosopher2 is thinking Philosopher5 is eating Philosopher3 is thinking Philosopher4 is thinking Philosopher5 is sleeping Philosopher2 is eating Philosopher4 is eating Philosopher2 is sleeping Philosopher3 is thinking Philosopher4 is sleeping Philosopher3 is eating Philosopher3 is sleeping
どの哲学者も 2 回食事をして睡眠まで到達しています。
もうひとつ簡単な方法を紹介しましょう。奇数番目の哲学者は、まず左側のフォークを取り上げてから右側のフォークを取り、偶数番目の哲学者は、今までのように右側のフォークを取り上げてから左側のフォークを取ります。こんな簡単な方法で動作するのは不思議なように思います。たとえば、哲学者が 2 人の場合を考えてみてください。
哲学者 0 の右側のフォークを A、左側のフォークを B とします。哲学者 1 からみると、B が右側のフォークで、A が左側のフォークになります。デッドロックは、哲学者 0 が A を取り、哲学者 1 が B を取ったときに発生します。ここで、哲学者 1 が左側のフォーク A から取るようにします。先に哲学者 0 が A を取った場合、哲学者 1 は A があくまで待つことになるので、哲学者 0 はフォーク B を取って食事をすることができます。哲学者 1 が先にフォーク A を取った場合も同じです。これでデッドロックを防止することができます。
プログラムは次のようになります。
リスト : デッドロックの防止 (2) (defn person [m rfork lfork sch quit] (go-loop [n 2] (if (zero? n) (do (printf "Philosopher%d is sleeping\n" m) (flush) (>! quit true)) (do (printf "Philosopher%d is thinking\n" m) (flush) (<! (timeout 1000)) (if (even? m) (do (<! (get-fork rfork sch)) (<! (get-fork lfork sch))) (do (<! (get-fork lfork sch)) (<! (get-fork rfork sch)))) (printf "Philosopher%d is eating\n" m) (flush) (<! (timeout 500)) (<! (ret-fork rfork sch)) (<! (ret-fork lfork sch)) (recur (dec n))))))
if で m が偶数の場合は右側から、奇数の場合は左側のフォークから取るように処理を分けるだけです。
実行結果は次のようになります。
$ clj -M -m ph3 Philosopher5 is thinking Philosopher1 is thinking Philosopher2 is thinking Philosopher3 is thinking Philosopher4 is thinking Philosopher4 is eating Philosopher2 is eating Philosopher4 is thinking Philosopher2 is thinking Philosopher5 is eating Philosopher3 is eating Philosopher3 is thinking Philosopher5 is thinking Philosopher4 is eating Philosopher1 is eating Philosopher4 is sleeping Philosopher1 is thinking Philosopher3 is eating Philosopher5 is eating Philosopher3 is sleeping Philosopher5 is sleeping Philosopher2 is eating Philosopher2 is sleeping Philosopher1 is eating Philosopher1 is sleeping
正常に動作していますね。興味のある方はいろいろ試してみてください。
;;; ;;; ph2.clj : 哲学者の食事問題 ;;; ;;; Copyright (C) 2025 Makoto Hiroi ;;; (ns ph2 (:require [clojure.core.async :refer [chan go go-loop timeout <!! <! >!]])) ;; フォークのリクエスト ;; req :get, :ret ;; fork 0 - 4 ;; side :left, :right ;; rpley 応答用チャネル (defrecord Req [req fork side reply]) ;; フォークサーバー (defn forks [n sch] (let [fork-table (for [_ (range n)] (atom true)) fork-num (atom n)] (go-loop [] (let [{:keys [req fork side reply]} (<! sch)] (cond (= req :get) (if @(nth fork-table fork) (if (and (== @fork-num 1) (= side :right)) (>! reply false) (do (swap! (nth fork-table fork) not) (swap! fork-num dec) (>! reply true))) (>! reply false)) (= req :ret) (do (swap! (nth fork-table fork) not) (swap! fork-num inc) (>! reply true))) (recur))))) ;; フォークを取る (defn get-fork [fork side sch] (let [ich (chan) req (->Req :get fork side ich)] (go-loop [] (>! sch req) (cond (<! ich) (do (<! (timeout 100)) fork) (= side :left) false :else (do (<! (timeout 500)) (recur)))))) ;; フォークの返却 (defn ret-fork [fork side sch] (let [ich (chan)] (go (<! (timeout 100)) (>! sch (->Req :ret fork side ich)) (<! ich)))) ;; 哲学者 (defn person [m rfork lfork sch quit] (go-loop [n 2] (if (zero? n) (do (printf "Philosopher%d is sleeping\n" m) (flush) (>! quit true)) (do (printf "Philosopher%d is thinking\n" m) (flush) (<! (timeout 1000)) (<! (get-fork rfork :right sch)) (if (<! (get-fork lfork :left sch)) (do (printf "Philosopher%d is eating\n" m) (flush) (<! (timeout 500)) (<! (ret-fork rfork :rigth sch)) (<! (ret-fork lfork :left sch)) (recur (dec n))) (do (ret-fork rfork :right sch) (recur n))))))) (defn -main [] (let [sch (chan) quit (chan)] (forks 5 sch) (person 1 0 1 sch quit) (person 2 1 2 sch quit) (person 3 2 3 sch quit) (person 4 3 4 sch quit) (person 5 4 0 sch quit) (dotimes [_ 5] (<!! quit))))
;;; ;;; ph3.clj : 哲学者の食事問題 ;;; ;;; Copyright (C) 2025 Makoto Hiroi ;;; (ns ph3 (:require [clojure.core.async :refer [chan go go-loop timeout <!! <! >!]])) ;; フォークのリクエスト ;; req :get, :ret ;; fork 0 - 4 ;; rpley 応答用チャネル (defrecord Req [req fork reply]) ;; フォークサーバー (defn forks [n sch] (let [fork-table (for [_ (range n)] (atom true))] (go-loop [] (let [{:keys [req fork reply]} (<! sch)] (cond (= req :get) (if @(nth fork-table fork) (do (swap! (nth fork-table fork) not) (>! reply true)) (>! reply false)) (= req :ret) (do (swap! (nth fork-table fork) not) (>! reply true))) (recur))))) ;; フォークを取る (defn get-fork [fork sch] (let [ich (chan) req (->Req :get fork ich)] (go-loop [] (>! sch req) (if (<! ich) (do (<! (timeout 100)) fork) (do (<! (timeout 500)) (recur)))))) ;; フォークの返却 (defn ret-fork [fork sch] (let [ich (chan)] (go (<! (timeout 100)) (>! sch (->Req :ret fork ich)) (<! ich)))) ;; 哲学者 (defn person [m rfork lfork sch quit] (go-loop [n 2] (if (zero? n) (do (printf "Philosopher%d is sleeping\n" m) (flush) (>! quit true)) (do (printf "Philosopher%d is thinking\n" m) (flush) (<! (timeout 1000)) (if (even? m 2) (do (<! (get-fork rfork sch)) (<! (get-fork lfork sch))) (do (<! (get-fork lfork sch)) (<! (get-fork rfork sch)))) (printf "Philosopher%d is eating\n" m) (flush) (<! (timeout 500)) (<! (ret-fork rfork sch)) (<! (ret-fork lfork sch)) (recur (dec n)))))) (defn -main [] (let [sch (chan) quit (chan)] (forks 5 sch) (person 1 0 1 sch quit) (person 2 1 2 sch quit) (person 3 2 3 sch quit) (person 4 3 4 sch quit) (person 5 4 0 sch quit) (dotimes [_ 5] (<!! quit))))