M.Hiroi's Home Page

Functional Programming

お気楽 Erlang プログラミング入門

[ PrevPage | Erlang | NextPage ]

プロセス (前編)

今回は Erlang の「プロセス (process)」を使って、簡単な並行プログラミングに挑戦してみましょう。

●並行プログラミングとは?

「並行 (concurrent) プログラミング」は複数のプログラムを並行に実行しますが、このとき複数の CPU で同時に動かす場合と、一つの CPU で複数のプログラムを動かす場合があります。一般的には、前者を「並列 (parallel) プログラミング」といい、複数のハードウェアを並列に実行することによる処理速度の向上が主な目的となります。

後者の場合、一定時間毎に実行するプログラムを切り替えることで、複数のプログラムを並列に実行しているかのように見せることができます。この処理を「時分割 (time sharing)」もしくは「タイム・スライス (time slice)」といいます。一般に、タイム・スライスは OS でサポートされている機能です。OS が実行するプログラムのことを「プロセス (process)」または「タスク (task)」といいます。

並列的に動作するプログラムを一つのプロセスだけで作るのはけっこう大変です。そこで、プロセス内では逐次的な処理にとどめ、複数のプロセス間で情報交換を行うことにより、全体で並列的な動作を実現することを考えます。このほうが自然にプログラムを記述できる場合があるのです。これが後者の主な目的となります。

プロセスは互いに独立したプログラムですが、OS にはプロセス間でデータをやり取りする機能 (プロセス間通信) が用意されています。たとえば UNIX ライクな OS では、パイプ (pipe) を使って複数のプログラム (コマンド) を連結することができます。この場合、パイプを通してデータがプログラムに送られ、各プログラムは並行に動作することになります。入出力処理で待たされるコマンドがあったとしても、その間に他のコマンドが実行されるので、各コマンドを逐次的に実行するよりも、効率的に処理することが可能です。

最近では、一つのプログラムの中で独立した複数の処理を記述できるようになりました。この機能を「スレッド (thread)」とか「マルチスレッド」いいます。スレッドは「縫い糸」という意味ですが、プログラムでは「制御の流れ」という意味で使われています。並列的な動作をプログラムする場合、逐次的な処理を一つのスレッドに割り当て、複数のスレッドを並行に動作させることにより、全体で並列的な動作を実現するわけです。

一般に、スレッドは一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。これに対し、Ruby のファイバーや Lua のコルーチンは、プログラムの実行を一定時間毎に切り替えるものではありません。他のプログラムが実行できるよう自主的に処理を中断する、といった協調的な動作を行わせることで、複数のプログラムを並行に動作させています。これを「ノンプリエンプティブ (nonpreemptive)」といいます。

スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを「共有メモリ」といいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、プリエンプティブなスレッドの場合、共有メモリのアクセス時に発生する競合が問題になります。このため、プリエンプティブなマルチスレッドをサポートしているプログラミング言語では、競合を回避するための仕組みが用意されています。

Erlang の「プロセス」は OS のプロセスとは異なります。どちらかというと、プリエンプティブなマルチスレッドに近いもので、プロセスは OS ではなく Erlang が管理します。ただし、Erlang のプロセスには共有メモリ [*1] がありません。非同期のプロセス間通信だけをサポートしています。Erlang のプロセスはメッセージパッシングにより任意のデータを送受信することができます。仕組みが簡単な分だけ Erlang のプロセスは軽量で、高速に動作するといわれています。また、Erlang は「並列プログラミング」にも対応しています。これはあとで試してみましょう。

-- note --------
[*1] Erlang のプロセス辞書は同じプロセス内からしかアクセスできませんが、他のプロセスからアクセスできるデータ構造 (ets や dets など) が用意されているので、共有メモリのかわりに利用することもできます。

●プロセスの起動

それではプロセスを起動してみましょう。Erlang の場合、関数 spawn/3 を用いることで、異なるプロセスで関数を実行することができます。spawn の書式を次に示します。

spawn(モジュール名, 関数名, 引数リスト) => Pid (プロセス識別子)

spawn は新しいプロセスを生成して、そのプロセス内で指定された関数を実行します。返り値は新しく生成したプロセスを表すデータで、これを Pid (プロセス識別子) といいます。spawn で呼び出した関数が他の関数を呼び出す場合、その関数は呼び出し元の関数と同じプロセスで実行されます。spawn から呼び出された関数の実行が終了すると、そのプロセスの実行も終了します。

簡単な例を示しましょう。0.5 秒間隔で Name を N 回表示するプログラムを作ります。

リスト : 0.5 秒間隔で Name を N 回表示する

-module(prcs).
-export([test/2]).

test(0, _) -> ok;
test(N, Name) ->
    io:format('~w ~w~n', [N, Name]),
    timer:sleep(500),
    test(N - 1, Name).

timer:sleep(N) はプログラムの実行を N msec 休止する関数です。test を通常の関数呼び出しすると、次のようになります。

> prcs:test(10, foo).
10 foo
9 foo
8 foo
7 foo
6 foo
5 foo
4 foo
3 foo
2 foo
1 foo
ok
> prcs:test(5, foo), prcs:test(5, bar).
5 foo
4 foo
3 foo
2 foo
1 foo
5 bar
4 bar
3 bar
2 bar
1 bar
ok

最初の例は 0.5 秒間隔で数字と foo が表示されます。次の例は test が順番に呼び出されるので、最初に foo が表示されてから、次に bar が表示されます。

spawn を用いると、foo と bar の表示が並行に行われます。それでは試してみましょう。

> spawn(prcs, test, [10, foo]).
10 foo
<0.41.0>
> 9 foo
> 8 foo
> 7 foo
> 6 foo
> 5 foo
> 4 foo
> 3 foo
> 2 foo
> 1 foo
> spawn(prcs, test, [5, foo]), prcs:test(5, bar).
5 foo
5 bar
4 bar
4 foo
3 foo
3 bar
2 bar
2 foo
1 foo
1 bar
ok

spwan で test を呼び出すと、Eshell と別のプロセスが生成されて、そのプロセスで test が実行されます。<0.41.0> は spawn が返した Pid を Eshell が表示したものです。次の例は、foo の表示を別プロセスで、bar の表示を Eshell と同じプロセスで行っています。逐次処理とは違って、foo と bar を表示する処理が並行に行われていることがわかります。

●メッセージの送信と受信

メッセージの送信は演算子 ! で、受信は receive で行います。

    Pid ! Message

図 : メッセージの送信
receive
    pattern1 -> 式, ... 式;

      ・・・・・

    patternN -> 式, ... 式
    [after Msec -> 式, ... 式]
end

    図 : メッセージの受信

Pid ! Message は Message をプロセス Pid に送信します。プロセスには専用のメールボックスが用意されていると考えてください。メッセージはそのメールボックスに入れられます。

メールボックスからメッセージを受け取るときは receive を使います。receive はメッセージを受信するまで待機しています。メッセージを受信するとメッセージとパターンを照合し、マッチングに成功した節を実行します。このとき、メッセージはメールボックスから削除されます。マッチングに失敗したメッセージはメールボックスから削除さずに残っていて、receive は次のメッセージが来るのを待ちます。

receive は after Msec で待ち時間を設定することができます。Msec はミリ秒単位で指定します。after を指定することで、タイムアウトの処理を行うことができます。

簡単な例を示しましょう。

リスト : メッセージを表示する

flush() ->
    receive
        M -> io:format('~w~n', [M]), flush()
    after
        0 -> ok
    end.

echo() ->
    receive
        {Name, N} -> io:format('~w ~w~n', [Name, N]), echo();
        stop -> flush()
    end.

関数 echo/0 は受け取ったメッセージ {Name, N} を表示するだけです。メッセージが stop の場合、関数 flush/0 を呼び出して、メールボックスにたまっているメッセージを表示して終了します。flush は after で 0 を指定しているので、メールボックスが空になった時点で処理を終了します。echo と flush は末尾再帰になっていることに注意してください。

それでは実行してみましょう。

> P0 = spawn(prcs, echo, []).
<0.45.0>
> P0 ! {foo, 0}.
foo 0
{foo,0}
> P0 ! {bar, 0}.
bar 0
{bar,0}
> P0 ! foo.
foo
> P0 ! bar.
bar
> is_process_alive(P0).
true
> P0 ! stop.
foo
stop
bar
> is_process_alive(P0).
false

spawn で echo を実行するプロセス P0 を生成します。P0 にメッセージ {foo, 0} を送信すると、foo 0 と表示されます。次の {foo, 0} は演算子 ! の返り値を Eshell が表示したものです。同様に {bar, 0} を送信すると bar 0 が表示されますが、foo と bar を送信してもパターンとマッチングしないのでメッセージは表示されません。

関数 is_process_alive/1 は、プロセスが実行中か否かを調べる関数です。実行中の場合は true を、終了した場合は false を返します。foo と bar を送信してもメッセージは表示されませんが、is_process_alive の返り値は true なので、プロセス P0 は実行中であることがわかります。

次に P0 に stop を送信します。すると、関数 flush が実行されて、メールボックスに溜まっていたメッセージ foo と bar が表示されます。stop は演算子 ! の返り値を Eshell が表示したものです。is_process_alive の返り値は false になるので、echo の実行が終了してプロセス P0 も終了したことがわかります。

●メッセージの交換

もう一つ、簡単な例を示します。今度はお互いにメッセージを交換する処理を考えてみましょう。次のリストを見てください。

リスト : メッセージの送受信

% データを送る
send_color(0, _, _) -> ok;
send_color(N, Color, P) ->
    P ! {self(), Color},
    receive
        R -> R
    end,
    send_color(N - 1, Color, P).

% データを受け取って表示する
receive_color() ->
    receive
        {P, Color} ->
            io:format('~w ', [Color]),
            P ! ok,
            receive_color();
        R ->
            io:format('illegal data ~w~n', [R]), false
    end.

関数 send_color/3 はプロセス P に色データ Color を N 回送信します。送信したあと、プロセス P からメッセージを受信するまで待ちます。このように返信が必要な場合、メッセージを送信するときに自分の Pid もいっしょに送ります。自分のプロセス識別子は関数 self() で求めることができます。その後、receive で返信が来るのを待ちます。

関数 receive_color/0 は受け取った色データ Color を表示してから、Color を送ってきたプロセス P に ok を送信します。{P, Color} 以外のメッセージは false を返して終了します。プロセスのエラー処理としては不適切だと思いますが、簡単な例題ということでご了承くださいませ。プロセスのエラー処理については次回以降で取り上げたいと思います。

それでは実行してみましょう。まずは receive_color だけ動かしてみます。

> P0 = spawn(prcs, receive_color, []).
<0.54.0>
> P0 ! {self(), red}, receive R -> R end.
red ok
> P0 ! foo.
illegal data foo
foo
> is_process_alive(P0).
false

spawn で receive_color を実行し、そのプロセス P0 にメッセージを送ります。この場合、self() の値は Eshell の Pid になります。そのあと、receive でメッセージを待ちます。red は receive_color が表示したメッセージで、ok は receive で受信したメッセージです。メッセージ foo を送信すると illegal data foo と表示され、プロセス P0 の実行は終了します。

次は send_color を並行に実行してみましょう。

> P1 = spawn(prcs, receive_color, []).
<0.59.0>
> spawn(prcs, send_color, [8, red, P1]),
 spawn(prcs, send_color, [8, blue, P1]),
 prcs:send_color(8, green, P1).
red blue green red blue green red blue green red blue green red blue green 
red blue green red blue green red blue green ok

最初に spawn で receive_color を実行します。次に spawn で 2 つの send_color を実行し、最後に send_color を関数呼び出しします。最初の 2 つは別々のプロセスで、最後の一つは Eshell と同じプロセスで実行されます。送信したデータはすべて表示されていることがわかります。最後の ok は 3 番目に実行した send_color の返り値が表示されたものです。

なお、終了したプロセスにメッセージを送ってもエラーにはならないことに注意してください。

> is_process_alive(P1).
true
> P1 ! stop.
illegal data stop
stop
> is_process_alive(P1).
false
> P1 ! stop.
stop

プロセス P1 に stop を送信すると illegal data stop と表示して P1 は終了します。このあと、P1 にメッセージ stop を送ってもエラーにはなりません。

●プロセスの同期

メッセージを使って同期をとることで、複数のプロセスを協調的に動作させることができます。たとえば、1 文字を表示するプロセスを複数個作成し、"hey! " とういう文字列を複数回画面に表示するプログラムを作ってみましょう。次のリストを見てください。

リスト : プロセスの同期 (1)

print_code(Code, P) ->
    receive
        stop -> ok;
        _ -> io:format('~s', [Code]), P ! ok, print_code(Code, P)
    end.

test_a(0, Ps) -> lists:foreach(fun(P) -> P ! stop end, Ps), ok;
test_a(N, Ps) ->
    lists:foreach(fun(P) -> P ! ok, receive _ -> ok end end, Ps),
    test_a(N - 1, Ps).

test_a(N) -> 
    test_a(N, lists:map(fun(X) -> spawn(prcs, print_code, [X, self()]) end, ["h", "e", "y", "!", " "])).

関数 print_code/2 は receive でメッセージを受信するまで待機します。stop を受信したら終了します。それ以外のメッセージであれば、format で Code を表示して、プロセス P に ok を送信します。そして、print_code を再帰呼び出しして、次のメッセージを受信するまで待機します。

test_a/1 は map と spawn で複数のプロセスを生成します。このとき、print_code に渡すプロセスは自分自身を表す self() です。test_a/2 の引数 N は "hey! " を表示する回数、Ps はプロセスを格納したリストです。foreach で Ps のプロセスに ok を送信し、そのプロセスからの返信があるまで receive で待機します。メッセージを受信したら、次のプロセスにメッセージを送信します。これを N 回繰り返したら、各プロセスにメッセージ stop を送信します。

それでは実行してみましょう。

> prcs:test_a(5).
hey! hey! hey! hey! hey! ok
> prcs:test_a(10).
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! ok

print_code の間でメッセージのやり取りを行うこともできます。次のリストを見てください。

リスト : プロセスの同期 (2)

test_b(0, Ps) -> lists:foreach(fun(P) -> P ! stop end, Ps), ok;
test_b(N, Ps) ->
    hd(Ps) ! ok,
    receive _ -> ok end,
    test_b(N - 1, Ps).

test_b(N) ->
    A = spawn(prcs, print_code, [" ", self()]),
    B = spawn(prcs, print_code, ["!", A]),
    C = spawn(prcs, print_code, ["y", B]),
    D = spawn(prcs, print_code, ["e", C]),
    E = spawn(prcs, print_code, ["h", D]),
    test_b(N, [E, D, C, B, A]).

関数 test_b は spawn で 5 つのプロセスを生成します。このとき、プロセス A の引数 Ch には self() を、プロセス B には A を渡します。つまり、次に表示させる文字のプロセスを渡していくわけです。そして、test_b/2 で Ps の先頭プロセスにメッセージを送ると、各プロセスが順番に実行されるので "hey! " を表示することができます。空白を表示したあと、メッセージは test_b/2 のプロセスに送信されるので、それを受信するまで待ちます。あとは、これを N 回繰り返すだけです。

> prcs:test_b(15).
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! ok
> prcs:test_b(20).
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! ok

●コルーチン

次はプロセスを使って「コルーチン (co-routine)」を実装してみましょう。コルーチンは「サブルーチン (sub-routine)」と比較するとわかりやすいと思います。ここではサブルーチンを関数のことと考えてください。サブルーチンは呼び出してから戻ってくるまで処理を中断することはできませんが、コルーチンは途中で処理を中断し、そこから実行を再開することができます。また、コルーチンを使うと複数のプログラムを (擬似的に) 並行に動作させることができます。

たとえば、Lua や Ruby (Fiber) などのノンプリエンプティブなコルーチンには親子関係があります。コルーチン A からコルーチン B を呼び出した場合、A が親で B が子になります。このように主従関係を持つコルーチンを「セミコルーチン (semi-coroutine)」といいます。コルーチンの親子関係は木構造と考えることができます。

コルーチンを実行 (または再開) するには関数 resume を使います。resume を呼び出したほうが親、呼び出されたほうが子になります。子コルーチンの中で関数 yield を実行すると、そこでプログラムの実行を中断して親コルーチンに戻ります。このとき、yield の引数が親コルーチンで呼び出した reusme の返り値になります。また、resume に引数を渡して実行を再開すると、それが yield の返り値となります。

コルーチンの詳しい説明は下記に示す拙作のページをお読みくださいませ。

Erlang の場合、処理を途中で中断したり、あとから処理を再開することは、メッセージを使って簡単に実装することができます。今回はプロセスをそのままコルーチンとして使うことにしましょう。ただし、Eralng はどのプロセスからでもメッセージを受信できるので、他人の子コルーチンを横取りして、その実行を再開することは可能です。また、親子のコルーチンが並行に動作することもありえます。セミコルーチンとは動作が大きく異なるところもありますが、あしからずご了承くださいませ。

それでは、コルーチンの操作関数 resume/2 と yield/1 を作りましょう。プログラムは次のようになります。

リスト : コルーチン

% プロセス P の処理を再開
resume(P, V) ->
    case is_process_alive(P) of
        true  -> P ! {next, V, self()}, receive {item, R} -> R end;
        false -> false
    end.

% プロセスの実行を中断する
yield(X) -> receive {next, V, P} -> P ! {item, X}, V end.

resume/2 はプロセス P にメッセージ next と引数 V と自分自身の Pid を送信して、receive で返信を待ちます。yield/1 は receive でメッセージ next を受信するまで待機します。受信したらメッセージ item と引数 X をプロセス P に返信し、受信した値 V を返します。簡単なプログラムですが、これでもコルーチンとして動作します。

それでは実際に試してみましょう。

リスト : 簡単なテスト

test_c1() ->
    io:write(yield(1)), io:write(one), 
    io:write(yield(2)), io:write(two), 
    io:write(yield(3)). io:write(three).
> P = spawn(prcs, test_c1, []).
<...>
> prcs:resume(P, ok).
okone1
> prcs:resume(P, ok).
oktwo2
> prcs:resume(P, ok).
okthree3
> prcs:resume(P, ok).
false

spawn でコルーチン (プロセス) を生成します。関数 test_c1/1 は yield(1) で処理を中断します。resume(P, ok) を実行すると、test_c1 から 1 が送信されて、test_c1 の処理が再開されます。その結果、ok と one が表示されて yield(2) で再び処理を中断します。その間に Eshell で実行した resume がメッセージを受信して、その返り値 1 が表示されます。あとは、resume を繰り返すたびに test_c2 から値を取得することができます。

●高階関数をジェネレータに変換

プロセスを使うと高階関数を「ジェネレータ (generator)」に変換することも簡単にできます。ジェネレータは呼び出されるたびに新しい値を生成していく関数と考えてください。たとえば、Erlang の関数 rand.uniform/0 は実行するたびに乱数を返します。つまり、uniform は乱数列を発生するジェネレータと考えることができます。

簡単な例として、リストの要素を順番に取り出すジェネレータを作ってみましょう。これは、foreach と yield を使って簡単にプログラムすることができます。

> P1 = spawn(lists, foreach, [fun prcs:yield/1, [1, 2, 3, 4, 5]]).
<...>
> prcs:resume(P1, ok).
1
> prcs:resume(P1, ok).
2
> prcs:resume(P1, ok).
3
> prcs:resume(P1, ok).
4
> prcs:resume(P1, ok).
5
> prcs:resume(P1, ok).
false

もう一つ、リストを「木」とみなして、木を巡回する高階関数 for_each_tree/2 を考えてみましょう。for_each_tree は次のように簡単にプログラムできます。

リスト : 木の巡回

for_each_tree(F, [X | Xs]) ->
    for_each_tree(F, X), for_each_tree(F, Xs);
for_each_tree(_, []) -> ok;
for_each_tree(F, X) -> F(X).

引数 F が要素に適用する関数、第 2 引数がリスト (木) です。最初の節で、木が [X | Xs] とマッチングする場合は、for_each_tree を再帰呼び出しして部分木 X と Xs をたどります。2 番目の節で、木が空の場合は ok を返します。3 番目の節が選択される場合、X は木の要素になるので、関数 F(X) を呼び出します。

簡単な実行例を示します。

> prcs:for_each_tree(fun(X) -> io:write(X), io:nl() end, [a, [b, [c, d], e], f]).
a
b
c
d
e
f
ok

このような高階関数も簡単にジェネレータに変換することができます。

> P2 = spawn(prcs, for_each_tree, [fun prcs:yield/1, [a, [b, [c, d], e], f]]).
<...>
> prcs:resume(P2, ok).
a
> prcs:resume(P2, ok).
b
> prcs:resume(P2, ok).
c
> prcs:resume(P2, ok).
d
> prcs:resume(P2, ok).
e
> prcs:resume(P2, ok).
f
> prcs:resume(P2, ok).
false

resume を呼び出すたびに、木の要素を順番に取り出して返します。要素がなくなるとプロセスが終了するので、返り値は false になります。

なお、次のようにジェネレータを生成する関数 make_generator/2 を定義することもできます。

リスト : ジェネレータの生成

make_generator(Module, Func, Args) ->
    spawn(Module, Func, [fun prcs:yield/1 | Args]).
> P3 = prcs:make_generator(lists, foreach, [[1, 2, 3, 4, 5]]).
<...>
> prcs:resume(P3, ok).
1
> prcs:resume(P3, ok).
2
> prcs:resume(P3, ok).
3
> prcs:resume(P3, ok).
4
> prcs:resume(P3, ok).
5
> prcs:resume(P3, ok).
false

●順列の生成

次は順列を生成するジェネレータを作ってみましょう。順列を生成する高階関数を用意すれば簡単にジェネレータを作ることができます。異なる n 個の順列の総数は、n の階乗 (n!) だけあります。たとえば、3 つの要素 a, b, c の順列は次に示すように 6 通りあります。

a b c,  a c b,  b a c,  b c a,  c a b,  c b a

順列を生成するプログラムは再帰定義で簡単に作ることができます。[a, b, c] の順列を生成する場合、最初に a で始まる順列を生成します。これは a を取り除いた数字 [b, c] の順列を生成することで実現できます。次は b で始まる順列を生成します。同様に、b を取り除いた数字 [a, c] の順列を生成すればいいわけです。[b, c] や [a, c] の順列を生成する場合も同じように考えることができます。

プログラムは次のようになります。

リスト : 順列の生成 (1)

permutation(F, [], A) -> F(lists:reverse(A));
permutation(F, Xs, A) ->
    lists:foreach(fun(X) -> permutation(F, lists:delete(X, Xs), [X | A]) end, Xs).
permutation(F, Xs) -> permutation(F, Xs, []).

関数 permutation/2 は高階関数で、引数 Xs がリスト、引数 F が生成した順列に適用する関数です。実際の処理は関数 permutation/3 で行います。引数 F が関数、引数 Xs がリスト、引数 A は累積変数で、選んだ数字を格納するリストです。最初の節で、リストが空リストの場合、順列が一つ完成したので lists:reverse で A を逆順にして関数 F に渡します。選んだ数字はリストの先頭に追加していくので、逆順になることに注意してください。

そうでなければ 2 番目の節で、数字を一つ選んで permutation を再帰呼び出しします。数字の選択はリストの先頭から順番に行えばいいので、lists:foreach を使っています。無名関数でリストの要素 X を受け取り、この中で premutation を再帰呼び出しします。permutation の第 2 引数は Xs から X を削除したリスト、第 3 引数は選択した数字のリスト A に X を追加したものです。これで数字 X を選択したことになります。

それでは、実際に試してみましょう。

> prcs:permutation(fun io:write/1, [a, b, c]).
[a,b,c][a,c,b][b,a,c][b,c,a][c,a,b][c,b,a]ok

permutation をジェネレータに変換することも簡単です。

> P4 = spawn(prcs, permutation, [fun prcs:yield/1, [a, b, c]]).
<...>
> prcs:resume(P4, ok).
[a,b,c]
> prcs:resume(P4, ok).
[a,c,b]
> prcs:resume(P4, ok).
[b,a,c]
> prcs:resume(P4, ok).
[b,c,a]
> prcs:resume(P4, ok).
[c,a,b]
> prcs:resume(P4, ok).
[c,b,a]
> prcs:resume(P4, ok).
false

正常に動作していますね。

●エラトステネスの篩

次はプロセスを使って素数を求めるプログラムを作ってみましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するプロセス (コルーチン) を用意します。この場合、コルーチンは「遅延ストリーム」として機能します。

一般に、データの流れを抽象化したデータ構造を「ストリーム (stream)」と呼びます。たとえば、ファイル入出力はストリームと考えることができます。また、リストを使ってストリームを表すこともできます。ただし、単純なリストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができます。これを「遅延ストリーム」と呼びます。

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して、新しいデータを求めればよいわけです。

Erlang の場合、コルーチンを使えば「遅延ストリーム」を作成することができるので、プログラムはとても簡単になります。2 は素数なので、この整数列から 2 で割り切れる整数を取り除き除きます。ここでもコルーチンを使って、入力ストリームから 2 で割り切れる整数を取り除いたストリームを返すフィルターを作ります。

2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これもフィルターを使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くようにフィルターを設定すればいいわけです。

このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番にフィルターで設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。

リスト : エラトステネスの篩

% 整数列の生成
integers(N) -> yield(N), integers(N + 1).

% フィルター
stream_filter(Pred, Src) ->
    X = resume(Src, ok),
    case Pred(X) of
        true -> yield(X), stream_filter(Pred, Src);
        false -> stream_filter(Pred, Src)
    end.

% エラトステネスの篩
sieve(0, _) -> ok;
sieve(N, Src) ->
    Prime = resume(Src, ok),
    io:format('~w ', [Prime]),
    sieve(N - 1, spawn(prcs, stream_filter, [fun(X) -> X rem Prime =/= 0 end, Src])).
sieve(N) -> sieve(N, spawn(prcs, integers, [2])).

関数 integers/1 は n から始まる整数列を生成する関数です。再帰呼び出しの停止条件がないので無限ループになりますが、コルーチンとして生成すれば正常に動作します。

関数 stream_filter/2 はストリーム Src から述語 Pred を満たす要素を取り出す関数です。resume で Src から要素 X を取り出します。case 式で Pred(X) を呼び出し、返り値が true であれば、yield(X) を評価してから stream_filter を再帰呼び出しします。false の場合は yield(X) を呼び出さずに stream_filter を再帰呼び出しします。この関数もコルーチンとして生成すれば、遅延ストリームとして使用することができます。

素数を求める関数 sieve も簡単です。引数 N は求める素数の個数です。sieve/1 は 2 から始まる整数列 (ストリーム) を spawn と integers で生成して sieve/2 に渡します。このストリームの先頭要素が素数になります。resume でストリームから素数を取り出して Prime にセットします。次に Prime を表示して、Prime で割り切れる整数を取り除くフィルターを生成して sieve を再帰呼び出しします。つまり、N 個の素数を求めるために N 個のフィルターをストリームに重ねていくわけです。

それでは実際に sieve(100) を実行してみましょう。

> prcs:sieve(100).
2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103
107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211
223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331
337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449
457 461 463 467 479 487 491 499 503 509 521 523 541 ok

正常に動作していますね。ただし、実行速度は遅いので引数 N には大きな値を与えないように注意してくださいね。


●プログラムリスト

%
% prcs.erl : Erlang の並行処理 (プロセス)
%
%            Copyright (C) 2018 Makoto Hiroi
%
-module(prcs).
-export([test/2, test_a/1, test_b/1, print_code/2, echo/0, send_color/3, receive_color/0]).
-export([resume/2, yield/1, permutation/2, integers/1, stream_filter/2, sieve/1]).
-export([make_generator/3, for_each_tree/2, test_c1/0, test_c2/2]).

test(0, _) -> ok;
test(N, Name) ->
    io:format('~w ~w~n', [N, Name]),
    timer:sleep(500),
    test(N - 1, Name).

%
% メッセージの送信と受信
%
flush() ->
  receive
    M -> io:format('~w~n', [M]), flush()
  after
    0 -> ok
  end.

echo() ->
  receive
    {Name, N} -> io:format('~w ~w~n', [Name, N]), echo();
    stop -> flush()
  end.

% データを受け取って表示する
receive_color() ->
  receive
    {P, Color} ->
      io:format('~w ', [Color]),
      P ! ok,
      receive_color();
    R ->
      io:format('illegal data ~w~n', [R]), false
    end.

% データを送る
send_color(0, _, _) -> ok;
send_color(N, Color, P) ->
    P ! {self(), Color},
    receive
      R -> R
    end,
    send_color(N - 1, Color, P).

%
% プロセスの同期
%
print_code(Code, Ch) ->
    receive
        stop -> ok;
        _ -> io:format('~s', [Code]), Ch ! ok, print_code(Code, Ch)
    end.

test_a(0, Ps) -> lists:foreach(fun(P) -> P ! stop end, Ps), ok;
test_a(N, Ps) ->
    lists:foreach(fun(P) -> P ! ok, receive _ -> ok end end, Ps),
    test_a(N - 1, Ps).

test_a(N) -> test_a(N, lists:map(fun(X) -> spawn(prcs, print_code, [X, self()]) end,
                                 ["h", "e", "y", "!", " "])).

test_b(0, Ps) -> lists:foreach(fun(P) -> P ! stop end, Ps), ok;
test_b(N, Ps) ->
    hd(Ps) ! ok,
    receive _ -> ok end,
    test_b(N - 1, Ps).

test_b(N) ->
    A = spawn(prcs, print_code, [" ", self()]),
    B = spawn(prcs, print_code, ["!", A]),
    C = spawn(prcs, print_code, ["y", B]),
    D = spawn(prcs, print_code, ["e", C]),
    E = spawn(prcs, print_code, ["h", D]),
    test_b(N, [E, D, C, B, A]).

%
% コルーチン
%

% プロセス P から値を取得する
resume(P, V) ->
  case is_process_alive(P) of
    true  -> P ! {next, V, self()}, receive {item, R} -> R end;
    false -> false
  end.

% 値を返す
yield(X) -> receive {next, V, P} -> P ! {item, X}, V end.

test_c1() -> 
    io:write(yield(1)), io:write(one), 
    io:write(yield(2)), io:write(two), 
    io:write(yield(3)), io:write(three).

test_c2(P, Name) ->
    case resume(P, ok) of
        false -> ok;
        M -> io:format('~s ~w~n', [Name, M]), test_c2(P, Name)
    end.

%
% ジェネレータの生成
%
make_generator(Module, Func, Args) -> spawn(Module, Func, [fun prcs:yield/1 | Args]).

% 木の巡回
for_each_tree(F, [X | Xs]) -> for_each_tree(F, X), for_each_tree(F, Xs);
for_each_tree(_, []) -> ok;
for_each_tree(F, X) -> F(X).

% 順列の生成
permutation(F, [], A) -> F(lists:reverse(A));
permutation(F, Xs, A) ->
  lists:foreach(fun(X) -> permutation(F, lists:delete(X, Xs), [X | A]) end, Xs).
permutation(F, Xs) -> permutation(F, Xs, []).

%
% エラトステネスの篩
%

% 整数列の生成
integers(N) -> yield(N), integers(N + 1).

% フィルター
stream_filter(Pred, Src) ->
  X = resume(Src, ok),
  case Pred(X) of
    true -> yield(X), stream_filter(Pred, Src);
    false -> stream_filter(Pred, Src)
  end.

sieve(0, _) -> ok;
sieve(N, Src) ->
  Prime = resume(Src, ok),
  io:format('~w ', [Prime]),
  sieve(N - 1,
        spawn(prcs, stream_filter, [fun(X) -> X rem Prime =/= 0 end, Src])).
sieve(N) -> sieve(N, spawn(prcs, integers, [2])).

初出 2011 年 10 月 22 日
改訂 2018 年 12 月 30 日

Copyright (C) 2011-2018 Makoto Hiroi
All rights reserved.

[ PrevPage | Erlang | NextPage ]