M.Hiroi's Home Page

Functional Programming

お気楽 Standard ML of New Jersey 入門

[ PrevPage | SML/NJ | NextPage ]

コルーチン (2)

今回はコルーチンの応用例として、簡単な「並行プログラミング」に挑戦してみましょう。並行プログラミングといっても、複数のプログラム (関数) を擬似的に並行に動かすだけなので、大げさに考えないでくださいね。ノンプリエンプティブなマルチプロセス (マルチタスク) の基本的な動作は、コルーチンを使って簡単に実装することができます。

●並行プログラミングとは?

「並行 (concurrent) プログラミング」は複数のプログラムを並行に実行しますが、このとき複数の CPU で同時に動かす場合と、ひとつの CPU で複数のプログラムを動かす場合があります。一般的には、前者を「並列 (parallel) プログラミング」といい、複数のハードウェアを並列に実行することによる処理速度の向上が主な目的となります。

後者の場合、一定時間毎に実行するプログラムを切り替えることで、複数のプログラムを並列に実行しているかのように見せることができます。この処理を「時分割 (time sharing)」もしくは「タイム・スライス (time slice)」といいます。一般に、タイム・スライスは OS でサポートされている機能です。OS が実行するプログラムのことを「プロセス (process)」または「タスク (task)」といいます。本稿ではプロセスとタスクは同じものとして扱います。

並列的に動作するプログラムをひとつのプロセスだけで作るのはけっこう大変です。そこで、プロセス内では逐次的な処理にとどめ、複数のプロセス間で情報交換を行うことにより、全体で並列的な動作を実現することを考えます。このほうが自然にプログラムを記述できる場合があるのです。これが後者の主な目的となります。

プロセスは互いに独立したプログラムですが、OS にはプロセス間でデータをやり取りする機能 (プロセス間通信) が用意されています。たとえば、UNIX ライクな OS では「パイプ (pipe)」を使って複数のプログラム (コマンド) を連結することができます。この場合、パイプを通してデータがプログラムに送られ、各プログラムは並行に動作することになります。入出力処理で待たされるコマンドがあったとしても、その間に他のコマンドが実行されるので、各コマンドを逐次的に実行するよりも、効率的に処理することが可能です。

最近では、ひとつのプログラムの中で独立した複数の処理を記述できるようになりました。この機能を「スレッド (thread)」とか「マルチスレッド」いいます。スレッドは「縫い糸」という意味ですが、プログラムでは「制御の流れ」という意味で使われています。並列的な動作をプログラムする場合、逐次的な処理をひとつのスレッドに割り当て、複数のスレッドを並行に動作させることにより、全体で並列的な動作を実現するわけです。

また、スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを「共有メモリ」といいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、プリエンプティブなスレッドの場合、共有メモリのアクセス時に発生する「競合」が問題になります。このため、マルチスレッドをサポートしているプログラミング言語では、競合を回避するための仕組みが用意されています。

今回作成するプログラムはコルーチンを利用したノンプリエンプティブなものなので、競合について考慮する必要はありません。ただし、複数のプロセス間で「通信 communication)」を行うので、待ち合わせが必要になることがあります。この処理を「同期 (synchronization)」といいます。並行プログラミングの場合、通信と同期という 2 つの処理が重要になります。

●簡単なマルチプロセスの作成

それではプログラムを作りましょう。プロセスは引数なしの関数で表します。一般に、プロセスには優先順位 (プライオリティ) がありますが、今回はプログラムを簡単にするため、すべてのプロセスは同じ優先順位とします。この場合、コルーチンをそのままプロセスとして扱うと簡単です。

最初に、メインプロセスをひとつ用意し、そこでコルーチンを生成して呼び出します。中断したコルーチンはキューに格納しておけばいいでしょう。つまり、キューからコルーチンを取り出して実行を再開し、中断したら再びキューに追加すればいいわけです。コルーチンの実行が終了した場合、そのコルーチンはキューに追加しません。これで生成したプロセスを擬似的にですが並行に実行することができます。

なお、今回使用するコルーチンは親コルーチンに値を返す必要がないので、前回最初に作成したコルーチンを切り替えるだけのものとします。

プログラムは次のようになります。

リスト : 簡単なマルチプロセス

(* 中断中のプロセスを格納するキュー *)
val proc_queue = make_queue(128, coroutine_create(fn () => ()))

(* プロセスの生成 *)
fun fork(f) = enqueue(proc_queue, coroutine_create(f))

(* メインプロセス *)
fun main_process(args) =
    let
      fun iter() =
          if queueEmpty(proc_queue) then ()
          else
            let
              val p = dequeue(proc_queue)
            in
              coroutine_resume(p);
              if deadCoroutine(p) then () else enqueue(proc_queue, p);
              iter()
            end
    in
      app (fn x => fork(x)) args;
      iter()
    end

(* 実行権の放棄 *)
fun yield () = coroutine_yield ()

(* 待機 *)
fun wait(pred) = (
      coroutine_yield ();
      if not(pred()) then wait(pred) else ()
    )

大域変数 proc_queue には中断したプロセスを格納するキューをセットします。このキューは配列を使って実装しています。キューの実装はあとで詳しく説明します。プロセスの生成は関数 fork で行います。引数 f はデータ型が unit -> unit の関数とします。coroutine_create でコルーチンを生成し、それを関数 enqueue でキューに追加します。

メインプロセスの実行は関数 main_process で行います。引数 args はリストで、要素はプロセスの実体を表す関数です。最初に引数 args から app で関数をひとつずつ取り出し、それを fork に渡してプロセスを生成します。あとは関数 dequeue でキューからプロセスを順番に取り出して変数 p にセットし、coroutine_resume でプロセス p の実行を再開します。関数 deadCoroutine はコルーチンが終了している場合は true を返します。false の場合、プロセス p はまだ終了していないので、enqueue で p をキューに追加します。

関数 wait は述語 pred が真を返すまでプロセスを待機させます。まず coroutine_yield を評価して main-process に戻ります。これで他のプロセスに実行権を渡すことができます。プロセスが再開されると、述語 pred を評価して、偽の場合は wait を再帰呼び出してプロセスを待機状態にします。関数 yield はプロセスの実行権を他のプロセスに渡すだけです。

●簡単な実行例

それでは簡単な例を示しましょう。次のリストを見てください。

リスト : 簡単なテスト

fun test0(_, 0) = ()
|   test0(name, n) = (
      print(name ^ " " ^ Int.toString(n) ^ "\n");
      yield();
      test0(name, n - 1)
    )

test0 は name を n 回表示します。name と n を表示したあと、yield で実行権を放棄します。これで他のプロセスが実行されるので、複数のプロセスを並行に動作させることができます。実行例を示します。

- main_process([fn () => test0("foo", 7), fn() => test0("bar", 5)]);
foo 7
bar 5
foo 6
bar 4
foo 5
bar 3
foo 4
bar 2
foo 3
bar 1
foo 2
foo 1
val it = () : unit
- main_process([fn () => test0("foo", 5), fn() => test0("bar", 4), fn() => test0("baz", 3)]);
foo 5
bar 4
baz 3
foo 4
bar 3
baz 2
foo 3
bar 2
baz 1
foo 2
bar 1
foo 1
val it = () : unit

このように、他のプロセスと通信を行わない場合、プログラムはとても簡単になります。

また、次に示すような擬似的なタイマーを作ることもできます。

リスト : タイマーもどき

fun make_timer(n) =
    let
      val m = ref n
    in
      fn () => (m := !m - 1; !m < 0)
    end

make_timer はクロージャを返します。クロージャは評価するたびに変数 m の値を -1 します。m が負になったら真を返します。wait と make_timer を組み合わせると、処理を n 回スキップする、つまり時間待ちと同様の効果を得ることができます。

簡単な例を示します。

リスト : 簡単なテスト (2)

fun test01(_, 0, _) = ()
|   test01(name, n, m) = (
      print(name ^ " " ^ Int.toString(n) ^ "\n");
      wait(make_timer(m));
      test01(name, n - 1, m)
    )

wait(make_timer(m)) で時間待ちを行います。たとえば、m に 0 を指定すると時間待ちは行われませんが、1 を指定すると処理を 1 回スキップすることになります。

それでは実際に試してみましょう。

- main_process([fn () => test01("foo", 10, 0), fn() => test01("bar", 5, 1)]);
foo 10
bar 5
foo 9
foo 8
bar 4
foo 7
foo 6
bar 3
foo 5
foo 4
bar 2
foo 3
foo 2
bar 1
foo 1
val it = () : unit

bar を表示する処理は 1 回待たされるので、foo と bar の表示は 2 対 1 の割合になります。

●配列によるキューの実装

ここで配列を使ったキューの実装方法について簡単に説明します。キューは配列を使って簡単に実現できます。先頭位置を示す front と末尾を示す rear を用意し、front と rear の間にあるデータをキューに格納されているデータとするのがポイントです。次の図を見てください。


                      図 : キューの動作

まずキューは空の状態で、rear, front ともに 0 です。データの追加は、rear が示す位置にデータを書き込み、rear の値をインクリメントします。データ 10, 20, 30 を追加すると、図のようにデータが追加され rear は 3 になります。このとき front は 0 のままなので、先頭のデータは 10 ということになります。

次に、データを取り出す場合、front の示すデータを取り出してから front の値をインクリメントします。この場合、front が 0 なので 10 を取り出して front の値は 1 となり、次のデータ 20 が先頭になります。データを順番に 20, 30 と取り出していくと、3 つしかデータを書き込んでいないので当然キューは空になります。このとき front は 3 になり rear と同じ値になります。このように、front と rear の値が 0 の場合だけが空の状態ではなく、front と rear の値が等しくなると、キューは空になることに注意してください。

rear, fornt ともに値は増加していく方向なので、いつかは配列の範囲をオーバーします。このため、配列を先頭と末尾がつがっているリング状と考え、rear, front が配列の範囲を超えたら 0 に戻すことにします。これを「循環配列」とか「リングバッファ」と呼びます。一般に、配列を使ってキューを実現する場合は、リングバッファとするのがふつうです。

プログラムは次のようになります。

リスト : 配列によるキューの実装

(* 例外 *)
exception Queue_empty
exception Queue_full

(* データ型の定義 Q(r, w, c, size, buff) *)
datatype 'a queue = Q of int ref * int ref * int ref * int * 'a array

(* キューの生成 *)
fun make_queue(size, init_value) =
    Q(ref 0, ref 0, ref 0, size, Array.array(size, init_value))

(* データの挿入 *)
fun enqueue(Q(_, w, c, size, buff), x) =
    if !c = size then raise Queue_full
    else (
      Array.update(buff, !w, x);
      c := !c + 1;
      w := !w + 1;
      if !w = size then w := 0 else ()
    )

(* データの取り出し *)
fun dequeue(Q(r, _, c, size, buff)) =
    if !c = 0 then raise Queue_empty
    else
      let
        val x = Array.sub(buff, !r)
      in
        c := !c - 1;
        r := !r + 1;
        if !r = size then r := 0 else ();
        x
      end

(* キューは空か *)
fun queueEmpty(Q(_, _, ref c, _, _)) = c = 0

(* キューは満杯か *)
fun queueFull(Q(_, _, ref c, size, _)) = c = size
val make_queue = fn : int * 'a -> 'a queue
val enqueue = fn : 'a queue * 'a -> unit
val dequeue = fn : 'a queue -> 'a
val queueEmpty = fn : 'a queue -> bool
val queueFull = fn : 'a queue -> bool

最初にキューを表すデータ型 'a queue を定義します。'a が格納する要素の型を表します。先頭の要素から順番に、リードカウンタ (front)、ライトカウンタ (rear)、キュー内のデータ数 (count)、キューのサイズ、キュー本体を表す配列となります。先頭から 3 つの要素は値を書き換えるので ref 変数となります。

関数 make_queue はキューを生成します。このとき、キューの大きさと要素の初期値を指定します。初期値はダミーデータになるので、格納する要素の実データであれば何でもかまいせん。

関数 enqueue はデータ x をキューに追加します。要素数 c が size と等しい場合、キューは満杯なのでエラー Queue_full を送出します。そうでなければ、buff の w 番目に x を書き込み、c と w を +1 します。w が size と等しくなったならば 0 に戻します。

関数 dequeue はキューからデータを取り出して返します。要素数 c が 0 ならば、キューは空なのでエラー Queue_empty を送出します。そうでなければ、buff の r 番目の要素を取り出して変数 x にセットします。そして、c を -1, r を +1 します。r が size と等しくなったならば 0 に戻します。

関数 queueEmpty はキューが空ならば true を返し、関数 queueFull はキューが満杯ならば true を返します。

簡単な実行例を示します。

- val q = make_queue(8, 0);
val q = Q (ref 0,ref 0,ref 0,8,[|0,0,0,0,0,0,0,0|]) : int queue
- enqueue(q, 1);
val it = () : unit
- enqueue(q, 2);
val it = () : unit
- enqueue(q, 3);
val it = () : unit
- enqueue(q, 4);
val it = () : unit
- dequeue(q);
val it = 1 : int
- dequeue(q);
val it = 2 : int
- dequeue(q);
val it = 3 : int
- dequeue(q);
val it = 4 : int

このように、キューにデータを追加した順番で、キューからデータが取り出されます。

●キューによる同期処理

次はプロセス間で通信を行う場合を考えてみましょう。この場合、いろいろな方法が考えられますが、今回は簡単な例としてキューを使ってみましょう。キューはベクタを使って実装します。プログラムは次のようになります。

リスト : キュー (ベクタによる実装)

(* プロセス間通信用キュー *)
(* データ型の定義 PQ(r, w, c, size, buff) *)
datatype 'a proc_queue = PQ of int ref * int ref * int ref * int * 'a array

(* キューの生成 *)
fun make_proc_queue(size, init_value) =
    PQ(ref 0, ref 0, ref 0, size, Array.array(size, init_value))

(* データの挿入 *)
fun proc_enqueue(PQ(_, w, c, size, buff), x) = (
      wait(fn () => !c < size);
      Array.update(buff, !w, x);
      c := !c + 1;
      w := !w + 1;
      if !w = size then w := 0 else ()
    )

(* データの取り出し *)
fun proc_dequeue(PQ(r, _, c, size, buff)) = (
      wait(fn () => !c > 0);
      let
        val x = Array.sub(buff, !r)
      in
        c := !c - 1;
        r := !r + 1;
        if !r = size then r := 0 else ();
        x
      end
    )
val make_proc_queue = fn : int * 'a -> 'a proc_queue
val proc_enqueue = fn : 'a proc_queue * 'a -> unit
val proc_dequeue = fn : 'a proc_queue -> 'a

ポイントはキューにデータを追加する proc_enqueue とデータを取り出す proc_dequeue で待ち合わせを行うところです。proc_enqueue では、キューが満杯のときに wait で待ち合わせを行います。逆に proc_dequeue の場合、キューが空のときに wait で待ち合わせを行います。これによって、プロセス間での同期処理が可能になります。

また、キューの大きさが少ない場合でも、データを書き込むプロセスと読み出すプロセスが並行に動作することで、キューの大きさ以上のデータを受け渡すことができます。

それでは簡単な実行例を示します。次のリストを見てください。

リスト : キューの実行例

val pq = make_proc_queue(10, "")

(* データを送る *)
fun send_color(_, 0) = ()
|   send_color(color, n) = (
      proc_enqueue(pq, color);
      send_color(color, n - 1)
    )

(* データを受け取る *)
fun receive_color(0) = ()
|   receive_color(n) = (
      print(proc_dequeue(pq) ^ "\n");
      receive_color(n - 1)
    )

(* 実行 *)
fun test_color() =
    main_process([fn () => send_color("red", 8),
                  fn () => send_color("blue", 8),
                  fn () => send_color("yellow", 8),
                  fn () => receive_color(24)])

make_proc_queue でキューを生成して大域変数 pq に格納します。キューの大きさは 10 とします。send_color はデータ (color) を n 個キューに書き込みます。receive_color は n 個のデータをキューから取り出して表示します。test_color では、キューに書き込むプロセスを 3 つ生成し、取り出すプロセスを 1 つ生成します。データを書き込む総数は 8 * 3 = 24 個なので、取り出すデータ数も 24 個とします。

それでは実行結果を示します。

- test_color();
red
blue
yellow
red
blue
yellow
red
blue
yellow
red
blue
yellow
red
blue
red
red
red
blue
blue
blue
yellow
yellow
yellow
yellow
val it = () : unit

24 個のデータすべて表示されています。キューが満杯になると、receive_color でデータを取り出さない限り、データを書き込むことはできません。このため、receive_color のあとに評価されるプロセスが優先されることになり、red が続けて書き込まれ、そのあとに blue が、最後に yellow がキューに書き込まれることになります。

send_color に wait(make-timer(1)) を追加すると、receive_color のプロセスのほうが多く評価されることになるため、red, blue, yellow の順番でデータが取り出されるようになります。

●哲学者の食事

最後に、「哲学者の食事」という並行プログラミングでは有名な問題を解いてみましょう。

[哲学者の食事]

5 人の哲学者が丸いテーブルに座っています.テーブルの中央にはスパゲッティが盛られた大皿があり、哲学者の間には 5 本のフォークが置かれています。哲学者は思索することとスパゲッティを食べることを繰り返します。食事のときには 2 本のフォークを持たなければなりません。食事が終わると 2 本のフォークを元の位置に戻します。

詳しい説明は 食事する哲学者の問題 -- Wikipedia をお読みください。

それではプログラムを作りましょう。最初にフォークを操作する関数を定義します。

リスト : フォークを操作する関数

(* フォークの有無を表す配列 *)
val forks = Array.array(5, true)

(* フォークがあるか *)
fun isFork(person, side) =
    Array.sub(forks, if side = "right" then person else (person + 1) mod 5)

(* フォークの書き換え *)
fun forkSet(person, side, v) =
    Array.update(forks,
                 if side = "right" then person else (person + 1) mod 5,
                 v)

(* フォークを取る *)
fun getFork(person, side) = (
      wait(fn () => isFork(person, side));
      forkSet(person, side, false)
    )

(* フォークを置く *)
fun putFork(person, side) = (
      forkSet(person, side, true);
      yield()
    )
val isFork = fn : int * string -> bool
val forkSet = fn : int * string * bool -> unit
val getFork = fn : int * string -> unit
val putFork = fn : int * string -> unit

フォークの有無は真偽値で表して、それを配列に格納します。配列は大域変数 forks にセットします。n 番目の哲学者の場合、右側のフォークがベクタの n 番目の要素、左側のフォークが n + 1 番目の要素になります。関数 isFork は n 番目の哲学者の side にフォークがあるとき真を返します。forkSet は n 番目の哲学者の side 側にあるフォークの値を v で書き換えます。

getFork はフォークを取る関数です。wait で isFork が真を返すまで待ちます。そのあとで、forkSet で対応するフォークの値を false に書き換えます。putFork はフォークを元に戻す関数です。forkSet でフォークの値を true に書き換え、yiled を評価して他のプロセスに実行権を渡します。

今回はノンプリエンプティブなコルーチンを使用しているので、forks をアクセスするときに競合は発生しません。プリエンプティブなマルチスレッドを使用する場合、共有メモリにアクセスするときは競合について考慮する必要があります。ご注意ください。

次は哲学者の動作をプログラムします。次のリストを見てください。

リスト : 哲学者の動作

fun person0(n) =
    let
      fun iter(2) = print("Philosopher" ^ Int.toString(n) ^ " is sleeping\n")
      |   iter(m) = (
            print("Philosopher" ^ Int.toString(n) ^ " is thinking\n");
            getFork(n, "right");
            getFork(n, "left");
            print("Philosopher" ^ Int.toString(n) ^ " is eating\n");
            yield();
            putFork(n, "right");
            putFork(n, "left");
            iter(m + 1)
          )
    in
      iter(0)
    end

関数 person0 の引数 n は哲学者の番号を表します。局所関数 itre の引数 m は食事をした回数です。2 回食事をしたら処理を終了します。食事をする場合、最初に getFork で右側のフォークを取り、次に左側のフォークを取ります。食事を終えたら putFork で右側のフォークを置き、次に左側のフォークを置きます。

このように、マルチプロセスを使うと簡単にプログラムできますが、実は並行プログラミング特有の大きな問題点があるのです。これはプログラムを実行してみるとわかります。

●実行結果 (1)

プログラムの実行は関数 test2 で行います。

リスト : 実行

fun test2(person) = (
      app (fn x => Array.update(forks, x, true)) [0,1,2,3,4];
      main_process([fn () => person(0),
                  fn () => person(1),
                  fn () => person(2),
                  fn () => person(3),
                  fn () => person(4)])
    )

最初に配列 forks を true で初期化します。そして、main_process に 5 人の哲学者を表すプロセスを渡して評価します。実行結果は次のようになります。

- test2(person0);
Philosopher0 is thinking
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking

Interrupt

このように、すべてのプロセスが待ち状態となり進むことができなくなります。これを「デッドロック (deadlock)」といいます。この場合、0 番目の哲学者の右側のフォークは、4 番目の哲学者の左側のフォークになります。各哲学者が右側のフォークを取り、左側のフォークが置かれるのを待つときにデッドロックとなるわけです。

●デッドロックの防止

デッドロックを防止する簡単な方法は、右側のフォークを取っても左側のフォークを取れないときは、右側のフォークを元に戻すことです。プログラムは次のようになります。

リスト : デッドロックの防止 (1)

fun person1(n) =
    let
      fun iter(2) = print("Philosopher" ^ Int.toString(n) ^ " is sleeping\n")
      |   iter(m) = (
            print("Philosopher" ^ Int.toString(n) ^ " is thinking\n");
            getFork(n, "right");
            if isFork(n, "left") then (
              forkSet(n, "left", false);
              print("Philosopher" ^ Int.toString(n) ^ " is eating\n");
              yield();
              putFork(n, "right");
              putFork(n, "left");
              iter(m + 1)
            ) else (
              putFork(n, "right");
              iter(m)
            )
          )
    in
      iter(0)
    end

右側のフォークを取ったあと、isFork で左側のフォークをチェックします。フォークがある場合は、forkSet で値を true に書き換えます。これでフォークを取って食事をすることができます。getFork を使うと他のプロセスに実行権が移るため、フォークの状態が変わる可能性があります。左側のフォークがない場合は putFork で右側のフォークを元に戻します。

●実行結果 (2)

実行結果は次のようになります。

- test2(person1);
Philosopher0 is thinking
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher0 is eating
Philosopher2 is eating
Philosopher4 is thinking
Philosopher1 is eating
Philosopher3 is eating
Philosopher0 is thinking
Philosopher2 is thinking
Philosopher0 is eating
Philosopher2 is eating
Philosopher1 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher1 is eating
Philosopher3 is eating
Philosopher0 is sleeping
Philosopher2 is sleeping
Philosopher4 is eating
Philosopher1 is sleeping
Philosopher3 is sleeping
Philosopher4 is thinking
Philosopher4 is eating
Philosopher4 is sleeping
val it = () : unit

このように、デッドロックしないで実行することができます。

●デッドロックの防止 (2)

もうひとつ簡単な方法を紹介しましょう。奇数番目の哲学者は、まず左側のフォークを取り上げてから右側のフォークを取り、偶数番目の哲学者は、今までのように右側のフォークを取り上げてから左側のフォークを取ります。こんな簡単な方法で動作するのは不思議なように思います。たとえば、哲学者が 2 人の場合を考えてみてください。

哲学者 0 の右側のフォークを A、左側のフォークを B とします。哲学者 1 からみると、B が右側のフォークで、A が左側のフォークになります。デッドロックは、哲学者 0 が A を取り、哲学者 1 が B を取ったときに発生します。ここで、哲学者 1 が左側のフォーク A から取るようにします。先に哲学者 0 が A を取った場合、哲学者 1 は A があくまで待つことになるので、哲学者 0 はフォーク B を取って食事をすることができます。哲学者 1 が先にフォーク A を取った場合も同じです。これでデッドロックを防止することができます。

プログラムは次のようになります。

リスト : デッドロックの防止 (2)

fun person2(n) =
    let
      fun iter(2) = print("Philosopher" ^ Int.toString(n) ^ " is sleeping\n")
      |   iter(m) = (
            print("Philosopher" ^ Int.toString(n) ^ " is thinking\n");
            if n mod 2 = 0 then (
              getFork(n, "right");
              getFork(n, "left")
            ) else (
              getFork(n, "left");
              getFork(n, "right")
            );
            print("Philosopher" ^ Int.toString(n) ^ " is eating\n");
            yield();
            putFork(n, "right");
            putFork(n, "left");
            iter(m + 1)
          )
    in
      iter(0)
    end

if で n が偶数の場合は右側から、奇数の場合は左側のフォークから取るように処理を分けるだけです。

●実行結果 (3)

実行結果は次のようになります。

- test2(person2);
Philosopher0 is thinking
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher0 is eating
Philosopher3 is eating
Philosopher1 is eating
Philosopher0 is thinking
Philosopher3 is thinking
Philosopher4 is eating
Philosopher1 is thinking
Philosopher2 is eating
Philosopher4 is thinking
Philosopher0 is eating
Philosopher3 is eating
Philosopher2 is thinking
Philosopher1 is eating
Philosopher0 is sleeping
Philosopher3 is sleeping
Philosopher4 is eating
Philosopher1 is sleeping
Philosopher2 is eating
Philosopher4 is sleeping
Philosopher2 is sleeping
val it = () : unit

正常に動作していますね。興味のある方はいろいろ試してみてください。

●参考文献, URL

  1. Paul Graham (著),野田 開 (訳), 『On Lisp』, Web 版
  2. Timothy Buddy (著), 吉田雄二 (監修), 長谷川明生・大田義勝 (訳), 『Little Smalltake 入門』, アスキー出版, 1989
  3. Ravi Sethi (著), 神林靖 (訳), 『プログラミング言語の概念と構造』, アジソンウェスレイ, 1995

●プログラムリスト

(*
 * process.sml : コルーチンを使った簡単なマルチプロセス
 *
 *               Copyright (C) 2012-2021 Makoto Hiroi
 *
 *)

(*
 * 継続によるコルーチンの実装
 * コルーチンを切り替えるだけ
 *)
open SMLofNJ.Cont

datatype coroutine = Co of (unit cont option ref) * (unit cont option cont option ref) * (unit -> unit) option ref

(* yield で戻るための継続 *)
val ret : unit cont option cont option ref = ref NONE

(* コルーチンの生成 *)
fun coroutine_create proc = Co(ref NONE, ref NONE, ref (SOME proc))

(* 実行の中断 *)
fun coroutine_yield () =
    callcc(fn k => throw (valOf(!ret)) (SOME k))

(* 実行の再開 *)
exception Coroutine_err
exception Dead_coroutine

fun coroutine_resume(Co(_, _, ref NONE)) = raise Dead_coroutine
|   coroutine_resume(Co(resume, save as (ref NONE), proc)) = (
    resume := callcc(fn k => (
      save := !ret;
      ret := SOME k;
      case !resume of
           NONE => ((valOf(!proc))(); proc := NONE; throw (valOf(!ret)) NONE)
         | (SOME x) => throw x () ));
    ret := !save;
    save := NONE;
    ()  )
|   coroutine_resume(_) = raise Coroutine_err

(* Dead Coroutine か *)
fun deadCoroutine(Co(_, _, ref proc)) =
    case proc of
         NONE => true
       | (SOME _) => false

(************************************************)

(* 配列によるキューの実装 *)

(* 例外 *)
exception Queue_empty
exception Queue_full

(* データ型の定義 Q(r, w, c, size, buff) *)
datatype 'a queue = Q of int ref * int ref * int ref * int * 'a array

(* キューの生成 *)
fun make_queue(size, init_value) =
    Q(ref 0, ref 0, ref 0, size, Array.array(size, init_value))

(* データの挿入 *)
fun enqueue(Q(_, w, c, size, buff), x) =
    if !c = size then raise Queue_full
    else (
      Array.update(buff, !w, x);
      c := !c + 1;
      w := !w + 1;
      if !w = size then w := 0 else ()
    )

(* データの取り出し *)
fun dequeue(Q(r, _, c, size, buff)) =
    if !c = 0 then raise Queue_empty
    else
      let
        val x = Array.sub(buff, !r)
      in
        c := !c - 1;
        r := !r + 1;
        if !r = size then r := 0 else ();
        x
      end

(* キューは空か *)
fun queueEmpty(Q(_, _, ref c, _, _)) = c = 0

(* キューは満杯か *)
fun queueFull(Q(_, _, ref c, size, _)) = c = size

(************************************************)

(* 簡単なマルチプロセス *)

(* 中断中のプロセスを格納するキュー *)
val proc_queue = make_queue(128, coroutine_create(fn () => ()))

(* プロセスの生成 *)
fun fork(f) = enqueue(proc_queue, coroutine_create(f))

(* メインプロセス *)
fun main_process(args) =
    let
      fun iter() =
          if queueEmpty(proc_queue) then ()
          else
            let
              val p = dequeue(proc_queue)
            in
              coroutine_resume(p);
              if deadCoroutine(p) then () else enqueue(proc_queue, p);
              iter()
            end
    in
      app (fn x => fork(x)) args;
      iter()
    end

(* 実行権の放棄 *)
fun yield () = coroutine_yield ()

(* 待機 *)
fun wait(pred) = (
      coroutine_yield ();
      if not(pred()) then wait(pred) else ()
    )

(* タイマーもどき *)
fun make_timer(n) =
    let
      val m = ref n
    in
      fn () => (m := !m - 1; !m < 0)
    end

(* 簡単なテスト *)
fun test0(_, 0) = ()
|   test0(name, n) = (
      print(name ^ " " ^ Int.toString(n) ^ "\n");
      yield();
      test0(name, n - 1)
    )

fun test01(_, 0, _) = ()
|   test01(name, n, m) = (
      print(name ^ " " ^ Int.toString(n) ^ "\n");
      wait(make_timer(m));
      test01(name, n - 1, m)
    )

(***********************************************)

(* プロセス間通信用キュー *)
(* データ型の定義 PQ(r, w, c, size, buff) *)
datatype 'a proc_queue = PQ of int ref * int ref * int ref * int * 'a array

(* キューの生成 *)
fun make_proc_queue(size, init_value) =
    PQ(ref 0, ref 0, ref 0, size, Array.array(size, init_value))

(* データの挿入 *)
fun proc_enqueue(PQ(_, w, c, size, buff), x) = (
      wait(fn () => !c < size);
      Array.update(buff, !w, x);
      c := !c + 1;
      w := !w + 1;
      if !w = size then w := 0 else ()
    )

(* データの取り出し *)
fun proc_dequeue(PQ(r, _, c, size, buff)) = (
      wait(fn () => !c > 0);
      let
        val x = Array.sub(buff, !r)
      in
        c := !c - 1;
        r := !r + 1;
        if !r = size then r := 0 else ();
        x
      end
    )

(* 簡単なテスト *)
val pq = make_proc_queue(10, "")

(* データを送る *)
fun send_color(_, 0) = ()
|   send_color(color, n) = (
      proc_enqueue(pq, color);
      send_color(color, n - 1)
    )

(* データを受け取る *)
fun receive_color(0) = ()
|   receive_color(n) = (
      print(proc_dequeue(pq) ^ "\n");
      receive_color(n - 1)
    )

fun test_color() =
    main_process([fn () => send_color("red", 8),
                  fn () => send_color("blue", 8),
                  fn () => send_color("yellow", 8),
                  fn () => receive_color(24)])

(************************************************)

(* 哲学者の食事問題 *)

(* フォークの有無を表す配列 *)
val forks = Array.array(5, true)

(* フォークがあるか *)
fun isFork(person, side) =
    Array.sub(forks, if side = "right" then person else (person + 1) mod 5)

(* フォークの書き換え *)
fun forkSet(person, side, v) =
    Array.update(forks,
                 if side = "right" then person else (person + 1) mod 5,
                 v)

(* フォークを取る *)
fun getFork(person, side) = (
      wait(fn () => isFork(person, side));
      forkSet(person, side, false)
    )

(* フォークを置く *)
fun putFork(person, side) = (
      forkSet(person, side, true);
      yield()
    )

(* 哲学者の動作 *)
fun person0(n) =
    let
      fun iter(2) = print("Philosopher" ^ Int.toString(n) ^ " is sleeping\n")
      |   iter(m) = (
            print("Philosopher" ^ Int.toString(n) ^ " is thinking\n");
            getFork(n, "right");
            getFork(n, "left");
            print("Philosopher" ^ Int.toString(n) ^ " is eating\n");
            yield();
            putFork(n, "right");
            putFork(n, "left");
            iter(m + 1)
          )
    in
      iter(0)
    end

(* デッドロックの防止1 *)
fun person1(n) =
    let
      fun iter(2) = print("Philosopher" ^ Int.toString(n) ^ " is sleeping\n")
      |   iter(m) = (
            print("Philosopher" ^ Int.toString(n) ^ " is thinking\n");
            getFork(n, "right");
            if isFork(n, "left") then (
              forkSet(n, "left", false);
              print("Philosopher" ^ Int.toString(n) ^ " is eating\n");
              yield();
              putFork(n, "right");
              putFork(n, "left");
              iter(m + 1)
            ) else (
              putFork(n, "right");
              iter(m)
            )
          )
    in
      iter(0)
    end

(* デッドロックの防止2 *)
fun person2(n) =
    let
      fun iter(2) = print("Philosopher" ^ Int.toString(n) ^ " is sleeping\n")
      |   iter(m) = (
            print("Philosopher" ^ Int.toString(n) ^ " is thinking\n");
            if n mod 2 = 0 then (
              getFork(n, "right");
              getFork(n, "left")
            ) else (
              getFork(n, "left");
              getFork(n, "right")
            );
            print("Philosopher" ^ Int.toString(n) ^ " is eating\n");
            yield();
            putFork(n, "right");
            putFork(n, "left");
            iter(m + 1)
          )
    in
      iter(0)
    end

(* 実行 *)
fun test2(person) = (
      app (fn x => Array.update(forks, x, true)) [0,1,2,3,4];
      main_process([fn () => person(0),
                  fn () => person(1),
                  fn () => person(2),
                  fn () => person(3),
                  fn () => person(4)])
    )

初版 2012 年 7 月 15 日
改訂 2021 年 5 月 29 日

Copyright (C) 2012-2021 Makoto Hiroi
All rights reserved.

[ PrevPage | SML/NJ | NextPage ]