M.Hiroi's Home Page

Functional Programming

お気楽 Erlang プログラミング入門

[ PrevPage | Erlang | NextPage ]

プロセス (後編)

●並列プログラミング

今回は Erlang で「並列 (parallel) プログラミング」に挑戦してみましょう。Erlang の場合、並列プログラミングを行うための特別な設定はありません。CPU がマルチコアであれば、Erlang のプロセスは並列に動作します。M.Hiroi のパソコンの CPU (CORE i5) は物理コア数が 2 で、1 コアにつきハイパースレッディングで 2 分割できるので、論理 CPU の数 (logical cpus) は 4 になります。

なお、Erlang が認識している論理 CPU の数は、関数 erlang:system_info() の引数に logcial_processors を指定すると確認できます。

> erlang:system_info(logical_processors).
4

●プロセスの終了待ち

複数のプロセスを並行 (並列) に実行したあと、それらの結果を使って何らかの処理を行いたい場合、プロセスがすべて終了するまで待つ必要があります。プロセスの終了待ちはメッセージを使うと簡単に実現することができます。次のリストを見てください。

リスト : プロセスの終了待ち

-module(para).
-export([execute/3, reduce/3, test/0, test/2]).

execute(F, Args, P) -> P ! {ok, apply(F, Args)}. 

reduce(0, _, A) -> A;
reduce(N, F, A) ->
    receive
       {ok, R} -> reduce(N - 1, F, F(A, R))
    end.

test(0, Name) -> Name;
test(N, Name) -> 
    io:format('~b ~w~n', [N, Name]),
    timer:sleep(500),
    test(N - 1, Name).

test() -> 
    spawn(para, execute, [fun para:test/2, [7, foo], self()]),
    spawn(para, execute, [fun para:test/2, [6, bar], self()]),
    spawn(para, execute, [fun para:test/2, [5, baz], self()]),
    reduce(3, fun(A, X) -> io:format('~w terminated~n', [X]), A end, ok).

関数 execute/3 は引数 F に渡された関数を実行し、それが終了したら引数 P のプロセスにその結果を送信します。高階関数 apply(F, Args) は関数 F に引数 Args を渡して実行します。apply/2 は Lisp / Scheme の関数 apply と同じ動作です。たとえば、apply(F, [1, 2, 3]) は F(1, 2, 3) を呼び出すことと同じになります。モジュール名、関数名、引数を受け取る関数 apply/3 もあります。

簡単な使用例を示しましょう。

> lists:member(1, [1, 2, 3, 4, 5]).
true
> apply(fun lists:member/2, [1, [1, 2, 3, 4, 5]]).
true
> apply(lists, member, [5, [1, 2, 3, 4, 5]]).
true

関数 reduce/3 は N 個のプロセスの結果を受信して、その値と累積変数 A を関数 F で畳み込みます。test/0 は spawn と execute で test/2 を実行するプロセスを 3 つ生成します。あとは、それらが終了するまで reduce/3 で待つだけです。

それでは、実際に試してみましょう。

> para:test().
7 foo
6 bar
5 baz
6 foo
5 bar
4 baz
5 foo
4 bar
3 baz
4 foo
3 bar
2 baz
3 foo
2 bar
1 baz
2 foo
1 bar
baz terminated
1 foo
bar terminated
foo terminated
ok
>

このように、3 つの test/2 が終了するまで test/0 を待機させることができます。

●並列処理の効果

次は、フィボナッチ関数を使って並列処理の効果を試してみましょう。

リスト : 並列処理の効率

fibo(0) -> 0;
fibo(1) -> 1;
fibo(N) when N > 1 -> fibo(N - 1) + fibo(N - 2).

test_seq(0, _) -> ok;
test_seq(N, M) -> fibo(M), test_seq(N - 1, M).

test_sub(0, _) -> ok;
test_sub(N, M) -> spawn(para, execute, [fun para:fibo/1, [M], self()]), test_sub(N - 1, M).

test_para(N, M) -> test_sub(N, M), reduce(N, fun(A, _) -> A end, ok).

関数 fibo/1 はフィボナッチ数を求めます。fibo は二重再帰なので実行時間はとても遅くなります。関数 test_seq/2 は fibo(M) を順番に N 回計算します。fibo(M) の実行時間が t とすると、test_seq の実行時間は t * N になります。関数 test_para/2 は fibo(M) を並列で N 回計算します。N が 4 以下の場合、うまくいけば test_para の実行時間は t に近い値になると思われます。

実行結果は次のようになりました。

> timer:tc(para, fibo, [38]).
{1875000,39088169}
> timer:tc(para, test_seq, [2, 38]).
{3672000,ok}
> timer:tc(para, test_para, [2, 38]).
{1969000,ok}
> timer:tc(para, test_seq, [3, 38]).
{5563000,ok}
> timer:tc(para, test_para, [3, 38]).
{2844000,ok}
> timer:tc(para, test_seq, [4, 38]).
{7422000,ok}
> timer:tc(para, test_para, [4, 38]).
{3453000,ok}

実行環境 : Windows 10, Intel Core i5-6200U 2.30GHz

fibo(38) の実行時間は 1.875 秒になりました。test_seq の実行時間はおおむね N に比例しています。test_para の場合、実行時間はおおむね test_seq の約 1 / 2 になっています。N = 4 のとき、test_para の実行速度は test_seq の 1 / 4 にはなりませんでしたが、並列処理の効果は十分に出ていると思います。

●数値積分

次は数値積分で円周率πを求めてみましょう。区間 [a, b] の定積分 \(\int_a^b f(x)\,dx\) を数値的に求めるには、区間を細分して小区間の面積を求めて足し上げます。小区間の面積を求める一番簡単な方法は長方形で近似することです。この場合、3 つの方法が考えられます。

  1. (b - a) * f(a)
  2. (b - a) * f(b)
  3. (b - a) * f((a + b) / 2)

1 は左端の値 f(a) を、2 は右端の値 f(b) を、3 は中間点の値 f((a + b) / 2) を使って長方形の面積を計算します。この中で 3 番目の方法が一番精度が高く、これを「中点則」といいます。このほかに、台形で近似する「台形則」や、2 次近似で精度を上げる「シンプソン則」という方法があります。

それでは実際に、1 の方法と中点則で \(\pi\) の値を求めてみましょう。\(\pi\) は次の式で求めることができます。

\( \pi = \displaystyle \int_0^1 \dfrac{4}{1 + x^2}\,dx \)

プログラムは次のようになります。

リスト : 数値積分で円周率を求める

% 左端
leftpoint(M, M, W, A) -> A * W;
leftpoint(N, M, W, A) ->
    X = N * W,
    leftpoint(N + 1, M, W, A + 4.0 / (1.0 + X * X)).
leftpoint(N) -> leftpoint(0, N, 1.0 / N, 0.0).

% 中点則
midpoint(M, M, W, A) -> A * W;
midpoint(N, M, W, A) ->
    X = (N + 0.5) * W,
    midpoint(N + 1, M, W, A + 4.0 / (1.0 + X * X)).
midpoint(N) -> midpoint(0, N, 1.0 / N, 0.0).

関数 leftPoint/1 は 1 の方法で、関数 midPoint/1 が中点則でπの値を求めます。引数 N が分割数です。実際の処理は leftpoit/4 と midpoint/4 で行います。引数 W が小区間の幅を、引数 A が面積を表します。あとは末尾再帰で区間 [0, 1] を N 個に分割して面積を求めます。

最初に x 座標を計算します。leftPoint は N * W でいいのですが、midPoint は中間点を求めるため (N + 0.5) * W で求めます。たとえば、変数 N が 0 の場合は 0.5 になるので、x は区間 [0 * w, 1 * w] の中間点になります。あとは、4.0 / (1 + X * X) を計算して A に加算します。最後に A に W を掛け算して全体の面積を求めます。

実行結果を示します。

> lists:foreach(fun(X) -> P = para:leftpoint(X), 
io:format('~w ~w~n', [P, P - math:pi()]) end, [10, 100, 1000, 10000]).
3.2399259889071588 0.09833333531736566
3.151575986923129 0.009983333333335676
3.1425924869231245 9.998333333314235e-4
3.14169265192314 9.999833334672914e-5
ok
> lists:foreach(fun(X) -> P = para:midpoint(X), 
io:format('~w ~w~n', [P, P - math:pi()]) end, [10, 100, 1000, 10000]).
3.1424259850010987 8.333314113055934e-4
3.1416009869231254 8.333333332277704e-6
3.1415927369231227 8.333332957022321e-8
3.141592654423134 8.3334095180021e-10
ok

関数 math:pi/0 は円周率を返します。中点則の場合、分割数を 10 倍すると誤差はほぼ 1/100 になります。それに対し、1 の方法は分割数を 10 倍しても誤差は 1 / 10 にしかなりません。このように、1 の方法は分割数を増やさないと精度の高い値を求めることができません。

ただし、浮動小数点数の計算には誤差があるので、精度には限界があります。中点則の場合、分割数を 1,000,000 より増やしても精度は高くなりません。1 の方法は分割数を増やすと誤差は少なくなりますが、実行時間がかかるようになります。そこで、並列処理を使って実行時間を短縮してみましょう。

●数値積分の並列化

並列化の考え方は簡単です。たとえば、4 つのプロセスで並列化するのであれば、区間を [0, 0.25], [0.25, 0.5], [0.5, 0.75], [0.75, 1] のように四等分して、それぞれの区間を 1 つのプロセスで並列に計算します。あとは、その値の足し算すればいいわけです。

プログラムは次のようになります。

リスト : 数値積分で円周率を求める (並列化)

leftpoint_para(0, _, _) -> ok;
leftpoint_para(N, M, W) ->
    X = N * M,
    spawn(para, execute, [fun para:leftpoint/4, [X - M, X, W, 0.0], self()]),
    leftpoint_para(N - 1, M, W).

leftpoint_para(N) ->
    K = 100000000,
    leftpoint_para(N, K / N, 1 / K),
    reduce(N, fun(A, X) -> A + X end, 0.0).

leftpoint_para/1 の引数 N がプロセスの数、変数 K が分割数です。関数 leftpoint/3 の引数 N, M は区間を、W が小区間の幅を表します。W は 1 / K になります。leftpoint_para/3 で spawn と execute でプロセスを生成して並列に実行します。あとは、leftpoint_para/1 でプロセスが終了するのを reduce で待機し、その結果をすべて足し算します。

それでは実行結果を示します。

> timer:tc(para, leftpoint_para, [1]).
{8578000,3.141592663590225}
> timer:tc(para, leftpoint_para, [2]).
{4610000,3.1415926635902114}
> timer:tc(para, leftpoint_para, [4]).
{4719000,3.1415926635898765}
> timer:tc(para, leftpoint_para, [8]).
{4734000,3.1415926635897886}

実行環境 : Windows 10, Intel Core i5-6200U 2.30GHz

並列に実行することで約 1.8 から 1.9 倍速くなりました。物理コア数が多い CPU だと、並列処理の効果はもっと大きくなると思われます。興味のある方は試してみてください。

●哲学者の食事

最後に、「哲学者の食事」という並行プログラミングでは有名な問題を解いてみましょう。

[哲学者の食事]

5 人の哲学者が丸いテーブルに座っています.テーブルの中央にはスパゲッティが盛られた大皿があり、哲学者の間には 5 本のフォークが置かれています。哲学者は思索することとスパゲッティを食べることを繰り返します。食事のときには 2 本のフォークを持たなければなりません。食事が終わると 2 本のフォークを元の位置に戻します。

詳しい説明は 食事する哲学者の問題 -- Wikipedia をお読みください。

それではプログラムを作りましょう。最初にフォークを管理する関数を定義します。

リスト : フォークを操作する関数

% フォークの管理
forks(Fs) ->
    receive
        {P, get, Fork} ->
            case lists:member(Fork, Fs) of
                true  -> P ! Fork, forks(lists:delete(Fork, Fs));
                false -> P ! wait, forks(Fs)
            end;
        {P, return, Fork} ->
            P ! ok, forks([Fork | Fs]);
        R -> io:format('illegal message ~w~n', [R]), false
    end.

% フォークの取得
get_fork(P, Fork) ->
    P ! {self(), get, Fork},
    receive
        wait -> timer:sleep(100), get_fork(P, Fork);
        Fork -> true
    end.

% フォークの返却
return_fork(P, Fork) ->
    P ! {self(), return, Fork},
    receive
        _ -> true
    end.

関数 forks はフォークをリスト Fs で受け取ります。そして、メッセージ {P, get, Fork} を受け取ると、要求された Fork が Fs にあればそれをプロセス P に送信して、Fs から Fork を削除します。Fork が無い場合は P にメッセージ wait を送信します。次の {P, return, Fork} はフォークを返却するメッセージです。P に ok を送信してから Fs に Fork を追加します。

forks は末尾再帰で無限ループを構成していることと、引数 Fs でフォークの有無 (状態) を管理していることに注意してください。関数 get_fork はフォーク Fork を要求するメッセージを送信します。wait が返ってきたら 100 msec 待ってから再度メッセージを送信します。関数 return_fork は Fork を返却するメッセージを送信します。return_fork もメッセージを受信するまで待ちます。

次は哲学者の動作をプログラムします。次のリストを見てください。

リスト : 哲学者の動作

person0(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person0(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    get_fork(P, Fork_r),
    get_fork(P, Fork_l),
    io:format('Philosopher~w is eating~n', [M]),
    timer:sleep(100),
    return_fork(P, Fork_r),
    return_fork(P, Fork_l),
    person0(M, N - 1, P, Fork_r, Fork_l).

関数 person0 の引数 M は哲学者の番号を表します。引数 N は食事をする回数です。0 になったら処理を終了します。引数 P はフォークを管理するプロセス、Fork_r が右側のフォーク、Fork_r が左側のフォークです。哲学者が食事をする場合、最初に get_fork で右側のフォークを取り、次に左側のフォークを取ります。食事を終えたら return_fork で右側のフォークを返却し、次に左側のフォークを返却します。

このように、プロセスを使うと簡単にプログラムできますが、実は並行プログラミング特有の大きな問題点があるのです。これはプログラムを実行してみるとわかります。

●実行結果 (1)

プログラムの実行は関数 test_person で行います。

リスト : 実行

test_person(P, F) ->
    spawn(para, F, [1, 2, P, a, b]),
    spawn(para, F, [2, 2, P, b, c]),
    spawn(para, F, [3, 2, P, c, d]),
    spawn(para, F, [4, 2, P, d, e]),
    spawn(para, F, [5, 2, P, e, a]).

test_person を実行する前に、spawn で forks を実行します。そのプロセス識別子を引数 P に渡します。引数 F は関数名 (person0) を表します。あとは、5 人の哲学者を spawn で起動するだけです。フォークはアトム a, b, c, d, e で表しています。哲学者は円形に並んでいるので、5 人目の左側のフォークが 1 人目の右側のフォークになります。

実行結果は次のようになります。

> P = spawn(para, forks, [[a, b, c, d, e]]).
<...>
> para:test_person(P, person0).
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher5 is thinking

このように、すべてのプロセスが待ち状態となり先へ進むことができなくなります。これを「デッドロック (deadlock)」といいます。哲学者全員が右側のフォークを取り、左側のフォークが置かれるのを待つときにデッドロックとなるわけです。

●デッドロックの防止

デッドロックを防止する簡単な方法は、右側のフォークを取っても左側のフォークを取れないときは、右側のフォークを元に戻すことです。プログラムは次のようになります。

リスト : デッドロックの防止 (1)

get_fork1(P, Fork) ->
    P ! {self(), get, Fork},
    receive
        wait -> false;
        Fork -> true
    end.

person1(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person1(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    get_fork(P, Fork_r),
    case get_fork1(P, Fork_l) of
        true ->
            io:format('Philosopher~w is eating~n', [M]),
            timer:sleep(100),
            return_fork(P, Fork_r),
            return_fork(P, Fork_l),
            person1(M, N - 1, P, Fork_r, Fork_l);
        false ->
            return_fork(P, Fork_r),
            person1(M, N, P, Fork_r, Fork_l)
    end.

右側のフォークを取ったあと、関数 get_fork1 で左側のフォークを要求します。フォークを受け取った場合は true を返すので、食事をすることができます。false の場合は右側のフォークを返却して思索に戻ります。

Lua のようなノンプリエンプティブなコールチンの場合、これでデッドロックを解消して正常に動作するのですが、プリエンプティブなプロセスでは新たな問題が発生します。

●実行結果 (2)

実行結果は次のようになります。

> para:test_person(Fk, person1).
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher5 is thinking
<...>
> Philosopher5 is thinking
> Philosopher4 is thinking
> Philosopher3 is thinking
> Philosopher2 is thinking
> Philosopher1 is thinking
> Philosopher1 is thinking
> Philosopher2 is thinking
> Philosopher3 is thinking
> Philosopher4 is thinking
> Philosopher5 is thinking

哲学者全員が右側のフォークを受け取っては返却することを繰り返すため、次の状態へ進むことができません。デッドロックではありませんが、無限ループに陥っているわけです。このような状態を「ライブロック (livelock)」といいます。

●ライブロックの解消

哲学者の食事問題の場合、ライブロックを解消する簡単な方法があります。フォークが残り 1 本の場合、右側のフォークを要求されたらそれを待たせることにするのです。左側のフォークであれば、その要求を受け付けます。4 人の哲学者が右側のフォークを持ったとき、5 人目の哲学者は右側のフォークを持つことができません。次に、4 人のうちの誰かが左側のフォークを要求し、それが受け付けられるので、最低でもひとりの哲学者が食事をすることができます。

プログラムは次のようになります。

リスト : ライブロックの解消

% フォークの管理
forks1a(Fs) ->
    receive
        {P, get, Side, Fork} ->
            case lists:member(Fork, Fs) of
                % 左側のフォークは最後の 1 本でも渡す
                true when length(Fs) > 1; Side =:= left -> P ! Fork, forks1a(lists:delete(Fork, Fs));
                _ -> P ! wait, forks1a(Fs)
            end;
        {P, return, Fork} -> P ! ok, forks1a([Fork | Fs]);
        R -> io:format('illegal message ~w~n', [R]), false
    end.

% フォークを要求する
get_fork1a(P, Side, Fork) ->
    P ! {self(), get, Side, Fork},
    receive
        Fork -> true;
        wait when Side =:= left -> false;
        wait -> timer:sleep(50), get_fork1a(P, Side, Fork)  % 右は待つ
    end.

% 哲学者の動作
person1a(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person1a(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    get_fork1a(P, right, Fork_r),
    case get_fork1a(P, left, Fork_l) of
        true ->
            io:format('Philosopher~w is eating~n', [M]),
            timer:sleep(100),
            return_fork(P, Fork_r),
            return_fork(P, Fork_l),
            person1a(M, N - 1, P, Fork_r, Fork_l);
        false ->
            return_fork(P, Fork_r),
            person1a(M, N, P, Fork_r, Fork_l)
  end.

フォークを要求するメッセージに左右を区別するデータ left, right を追加します。フォークを管理する関数 forks1a はフォークが要求されたとき、フォークが 2 本以上ある、または左側のフォークの場合は、そのフォークを渡します。それ以外の場合は wait を送信します。フォークを要求するメッセージを送信する関数 get_fork1a は、要求した Fork が返ってきた場合は true を返します。wait を受信したとき、左側のフォークであれば false を返し、右側のフォークは 50 msec 後に再度フォークを要求します。person1a は get_fork を get_fork1a に変更するだけです。

●実行結果 (3)

それでは実行してみましょう。

> Fk = spawn(para, forks1a, [[a, b, c, d, e]]).
<...>
> para:test_person(Fk, person1a).
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher5 is thinking
<...>
> Philosopher4 is eating
> Philosopher1 is thinking
> Philosopher2 is thinking
> Philosopher3 is thinking
> Philosopher4 is thinking
> Philosopher5 is eating
> Philosopher5 is thinking
> Philosopher3 is eating
> Philosopher1 is thinking
> Philosopher2 is thinking
> Philosopher3 is thinking
> Philosopher4 is eating
> Philosopher4 is sleeping
> Philosopher5 is eating
> Philosopher5 is sleeping
> Philosopher2 is eating
> Philosopher1 is thinking
> Philosopher2 is thinking
> Philosopher3 is eating
> Philosopher3 is sleeping
> Philosopher1 is eating
> Philosopher1 is thinking
> Philosopher2 is eating
> Philosopher2 is sleeping
> Philosopher1 is eating
> Philosopher1 is sleeping

どの哲学者も 2 回食事をして睡眠まで到達しています。

●デッドロックの防止 (2)

もうひとつ簡単な方法を紹介しましょう。奇数番目の哲学者は、まず左側のフォークを取り上げてから右側のフォークを取り、偶数番目の哲学者は、今までのように右側のフォークを取り上げてから左側のフォークを取ります。こんな簡単な方法で動作するのは不思議なように思います。たとえば、哲学者が 2 人の場合を考えてみてください。

哲学者 0 の右側のフォークを A、左側のフォークを B とします。哲学者 1 からみると、B が右側のフォークで、A が左側のフォークになります。デッドロックは、哲学者 0 が A を取り、哲学者 1 が B を取ったときに発生します。ここで、哲学者 1 が左側のフォーク A から取るようにします。先に哲学者 0 が A を取った場合、哲学者 1 は A があくまで待つことになるので、哲学者 0 はフォーク B を取って食事をすることができます。哲学者 1 が先にフォーク A を取った場合も同じです。これでデッドロックを防止することができます。

プログラムは次のようになります。

リスト : デッドロックの防止 (2)

person2(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person2(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    if
        M rem 2 =/= 0 ->
            get_fork(P, Fork_r),
            get_fork(P, Fork_l);
        true ->
            get_fork(P, Fork_l),
            get_fork(P, Fork_r)
    end,
    io:format('Philosopher~w is eating~n', [M]),
    timer:sleep(100),
    return_fork(P, Fork_r),
    return_fork(P, Fork_l),
    person2(M, N - 1, P, Fork_r, Fork_l).

if で M が偶数の場合は右側から、奇数の場合は左側のフォークから取るように処理を分けるだけです。

●実行結果 (4)

実行結果は次のようになります。

> Fk = spawn(para, forks, [[a, b, c, d, e]]).
<...>
> para:test_person(Fk, person2).
Philosopher1 is thinking
Philosopher2 is thinking
Philosopher3 is thinking
Philosopher4 is thinking
Philosopher5 is thinking
<...>
> Philosopher1 is eating
> Philosopher4 is eating
> Philosopher1 is thinking
> Philosopher4 is thinking
> Philosopher2 is eating
> Philosopher5 is eating
> Philosopher2 is thinking
> Philosopher5 is thinking
> Philosopher3 is eating
> Philosopher3 is thinking
> Philosopher1 is eating
> Philosopher4 is eating
> Philosopher1 is sleeping
> Philosopher4 is sleeping
> Philosopher2 is eating
> Philosopher5 is eating
> Philosopher2 is sleeping
> Philosopher5 is sleeping
> Philosopher3 is eating
> Philosopher3 is sleeping

正常に動作していますね。興味のある方はいろいろ試してみてください。

●参考文献, URL

  1. Paul Graham (著),野田 開 (訳), 『On Lisp』, Web 版
  2. Timothy Buddy (著), 吉田雄二 (監修), 長谷川明生・大田義勝 (訳), 『Little Smalltake 入門』, アスキー出版, 1989
  3. Ravi Sethi (著), 神林靖 (訳), 『プログラミング言語の概念と構造』, アジソンウェスレイ, 1995

●プログラムリスト

%
% para.erl : Erlang の並列処理
%
%            Copyright (C) 2018 Makoto Hiroi
%
-module(para).
-export([execute/3, reduce/3, test/0, test/2, fibo/1, test_seq/2, test_para/2, test_sub/2]).
-export([leftpoint/1, leftpoint/4, midpoint/1, leftpoint_para/1]).
-export([forks/1, person0/5, person1/5, test_person/2, forks1a/1, person1a/5, person2/5]).

%
% プロセスの終了待ち
%
execute(F, Args, P) -> P ! {ok, apply(F, Args)}. 

reduce(0, _, A) -> A;
reduce(N, F, A) ->
    receive
       {ok, R} -> reduce(N - 1, F, F(A, R))
    end.

test(0, Name) -> Name;
test(N, Name) -> 
    io:format('~b ~w~n', [N, Name]),
    timer:sleep(500),
    test(N - 1, Name).

test() -> 
    spawn(para, execute, [fun para:test/2, [7, foo], self()]),
    spawn(para, execute, [fun para:test/2, [6, bar], self()]),
    spawn(para, execute, [fun para:test/2, [5, baz], self()]),
    reduce(3, fun(A, X) -> io:format('~w terminated~n', [X]), A end, ok).

%
% 並列処理の効果
%
fibo(0) -> 0;
fibo(1) -> 1;
fibo(N) when N > 1 -> fibo(N - 1) + fibo(N - 2).

test_seq(0, _) -> ok;
test_seq(N, M) -> fibo(M), test_seq(N - 1, M).

test_sub(0, _) -> ok;
test_sub(N, M) -> spawn(para, execute, [fun para:fibo/1, [M], self()]), test_sub(N - 1, M).

test_para(N, M) -> test_sub(N, M), reduce(N, fun(A, _) -> A end, ok).

%
% 数値積分
%
leftpoint(M, M, W, A) -> A * W;
leftpoint(N, M, W, A) ->
    X = N * W,
    leftpoint(N + 1, M, W, A + 4.0 / (1.0 + X * X)).
leftpoint(N) -> leftpoint(0, N, 1.0 / N, 0.0).

midpoint(M, M, W, A) -> A * W;
midpoint(N, M, W, A) ->
    X = (N + 0.5) * W,
    midpoint(N + 1, M, W, A + 4.0 / (1.0 + X * X)).
midpoint(N) -> midpoint(0, N, 1.0 / N, 0.0).

% 並列化
leftpoint_para(0, _, _) -> ok;
leftpoint_para(N, M, W) ->
    X = N * M,
    spawn(para, execute, [fun para:leftpoint/4, [X - M, X, W, 0.0], self()]),
    leftpoint_para(N - 1, M, W).

leftpoint_para(N) ->
    K = 100000000,
    leftpoint_para(N, K / N, 1 / K),
    reduce(N, fun(A, X) -> A + X end, 0.0).

%
% 哲学者の食事
%

% フォークの管理
forks(Fs) ->
    receive
        {P, get, Fork} ->
            case lists:member(Fork, Fs) of
                true  -> P ! Fork, forks(lists:delete(Fork, Fs));
                false -> P ! wait, forks(Fs)
            end;
        {P, return, Fork} ->
            P ! ok, forks([Fork | Fs]);
        R -> io:format('illegal message ~w~n', [R]), false
    end.

% フォークの取得
get_fork(P, Fork) ->
    P ! {self(), get, Fork},
    receive
        wait -> timer:sleep(100), get_fork(P, Fork);
        Fork -> true
    end.

% フォークの返却
return_fork(P, Fork) ->
    P ! {self(), return, Fork},
    receive
        _ -> true
    end.

% 哲学者の動作
person0(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person0(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    get_fork(P, Fork_r),
    get_fork(P, Fork_l),
    io:format('Philosopher~w is eating~n', [M]),
    timer:sleep(100),
    return_fork(P, Fork_r),
    return_fork(P, Fork_l),
    person0(M, N - 1, P, Fork_r, Fork_l).

%
% デッドロックの解消
%
get_fork1(P, Fork) ->
    P ! {self(), get, Fork},
    receive
        wait -> false;
        Fork -> true
    end.

person1(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person1(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    get_fork(P, Fork_r),
    case get_fork1(P, Fork_l) of
        true ->
            io:format('Philosopher~w is eating~n', [M]),
            timer:sleep(100),
            return_fork(P, Fork_r),
            return_fork(P, Fork_l),
            person1(M, N - 1, P, Fork_r, Fork_l);
        false ->
            return_fork(P, Fork_r),
            person1(M, N, P, Fork_r, Fork_l)
    end.

%
% デッドロック (ライブロック) の解消
%

% フォークの管理
forks1a(Fs) ->
    receive
        {P, get, Side, Fork} ->
            case lists:member(Fork, Fs) of
                % 左側のフォークは最後の 1 本でも渡す
                true when length(Fs) > 1; Side =:= left -> P ! Fork, forks1a(lists:delete(Fork, Fs));
                _ -> P ! wait, forks1a(Fs)
            end;
        {P, return, Fork} -> P ! ok, forks1a([Fork | Fs]);
        R -> io:format('illegal message ~w~n', [R]), false
    end.

% フォークを要求する
get_fork1a(P, Side, Fork) ->
    P ! {self(), get, Side, Fork},
    receive
        Fork -> true;
        wait when Side =:= left -> false;
        wait -> timer:sleep(50), get_fork1a(P, Side, Fork)  % 右は待つ
    end.

% 哲学者の動作
person1a(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person1a(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    get_fork1a(P, right, Fork_r),
    case get_fork1a(P, left, Fork_l) of
        true ->
            io:format('Philosopher~w is eating~n', [M]),
            timer:sleep(100),
            return_fork(P, Fork_r),
            return_fork(P, Fork_l),
            person1a(M, N - 1, P, Fork_r, Fork_l);
        false ->
            return_fork(P, Fork_r),
            person1a(M, N, P, Fork_r, Fork_l)
    end.

%
% デッドロックの解消 (その2)
%
person2(M, 0, _, _, _) ->
    io:format('Philosopher~w is sleeping~n', [M]), ok;
person2(M, N, P, Fork_r, Fork_l) ->
    io:format('Philosopher~w is thinking~n', [M]),
    timer:sleep(1000),
    if
        M rem 2 =/= 0 ->
            get_fork(P, Fork_r),
            get_fork(P, Fork_l);
        true ->
            get_fork(P, Fork_l),
            get_fork(P, Fork_r)
    end,
    io:format('Philosopher~w is eating~n', [M]),
    timer:sleep(100),
    return_fork(P, Fork_r),
    return_fork(P, Fork_l),
    person2(M, N - 1, P, Fork_r, Fork_l).

% 哲学者の食事問題を解く
test_person(P, F) ->
    spawn(para, F, [1, 2, P, a, b]),
    spawn(para, F, [2, 2, P, b, c]),
    spawn(para, F, [3, 2, P, c, d]),
    spawn(para, F, [4, 2, P, d, e]),
    spawn(para, F, [5, 2, P, e, a]).

初出 2011 年 10 月 22 日
改訂 2018 年 12 月 30 日

Copyright (C) 2011-2018 Makoto Hiroi
All rights reserved.

[ PrevPage | Erlang | NextPage ]