今回は C# の Task, async, await を使って、簡単な「並行プログラミング」に挑戦してみましょう。
「並行 (concurrent) プログラミング」は複数のプログラムを並行に実行しますが、このとき複数の CPU で同時に動かす場合と、一つの CPU で複数のプログラムを動かす場合があります。一般的には、前者を「並列 (parallel) プログラミング」といい、複数のハードウェアを並列に実行することによる処理速度の向上が主な目的となります。
後者の場合、一定時間毎に実行するプログラムを切り替えることで、複数のプログラムを並列に実行しているように見せることができます。この処理を「時分割 (time sharing)」もしくは「タイム・スライス (time slice)」といいます。一般に、タイム・スライスは OS でサポートされている機能です。OS が実行するプログラムのことを「プロセス (process)」または「タスク (task)」といいます。
並列的に動作するプログラムをひとつのプロセスだけで作るのはけっこう大変です。そこで、プロセス内では逐次的な処理にとどめ、複数のプロセス間で情報交換を行うことにより、全体で並列的な動作を実現することを考えます。このほうが自然にプログラムを記述できる場合があるのです。これが後者の主な目的となります。
プロセスは互いに独立したプログラムですが、OS にはプロセス間でデータをやり取りする機能 (プロセス間通信) が用意されています。たとえば、UNIX ライクな OS では「パイプ (pipe)」を使って複数のプログラム (コマンド) を連結することができます。この場合、パイプを通してデータがプログラムに送られ、各プログラムは並行に動作することになります。入出力処理で待たされるコマンドがあったとしても、その間に他のコマンドが実行されるので、各コマンドを逐次的に実行するよりも、効率的に処理することが可能です。
最近では、ひとつのプログラムの中で独立した複数の処理を記述できるようになりました。この機能を「スレッド (thread)」とか「マルチスレッド」いいます。スレッドは「縫い糸」という意味ですが、プログラムでは「制御の流れ」という意味で使われています。並列的な動作をプログラムする場合、逐次的な処理をひとつのスレッドに割り当て、複数のスレッドを並行に動作させることにより、全体で並列的な動作を実現するわけです。
一般に、スレッドは一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。これに対し、Ruby のファイバーや Lua のコルーチンは、プログラムの実行を一定時間毎に切り替えるものではありません。他のプログラムが実行できるよう自主的に処理を中断する、といった協調的な動作を行わせることで、複数のプログラムを並行に動作させています。これを「ノンプリエンプティブ (nonpreemptive)」といいます。
スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを「共有メモリ」といいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、プリエンプティブなスレッドの場合、共有メモリのアクセス時に発生する「競合」が問題になります。このため、プリエンプティブなマルチスレッドをサポートしているプログラミング言語では、競合を回避するための仕組みが用意されています。
複数のスレッドを協調的に動作させる場合、スレッドの待ち合わせが必要になることがあります。この処理を「同期 (synchronization)」といいます。逆に、複数のスレッドを協調させずに動作させることを「非同期 (asynchronous)」といいます。C# の場合、マルチスレッドはクラス Thread を使ってプログラムすることができます。.NET Framework 4.5 からはより使いやすいクラス Task が、さらに C# 5.0 からは async, await という機能が導入され、現在では Task, async, await を使って非同期な処理をプログラムするのが主流のようです。
C# で非同期 (並列的) な処理をプログラムする場合、クラス Task を使うと便利です。Task を使うときは System.Threading.Tasks を using してください。コンストラクタ Task の引数には非同期に実行する処理をデリゲートまたはラムダ式で渡します。基本的なコンストラクタを以下に示します。
Task(Action action) => Task object Task(Action<T>, T x) => Task object Task<R>(Func<R> func) => Task<R> object Task<R>(Func<T, R> func, T x) => Task<R> object
Action と Func は C# に定義されている汎用のデリゲートです。簡単な説明は拙作のページ C# 入門: デリゲートとラムダ式 をお読みください。返り値が不要な場合は Action を、値を返したい場合は Func を使います。Action の場合、コンストラクタはクラス Task のインスタンスを返します。Func の場合はクラス Task<R> のインスタンスを返します。R は返り値の型を表します。
コンストラクタでタスクを生成したとき、引数で渡した処理はまだ実行されていません。タスクを実行するには以下のメソッドを使います。
1. task_obj.RunSynchronously() 2. task_obj.Start()
1 は現在のスレッドでタスクを実行し、それが終了するまで待機します。2 はタスクをスケジューラに登録し、それを実行します。2 はタスクが終了するまで待機しません。
一般に、マルチスレッドをサポートしている処理系では、スレッドを効率よく使い回すため「スレッドプール」という仕組みが用意されています。スレッドプールはあらかじめ複数のスレッドを生成し、それをプールしておきます。タスクを実行するとき、空いているスレッドを探してタスクに割り当てます。空きスレッドが無い場合、スレッドが空くまで待つことになります。このスケジューリングは C# が行います。これを「タスクスケジューラ」といいます。1 と 2 は引数にタスクスケジューラを指定することもできます。
簡単な例を示しましょう。
リスト : タスクのサンプル (task.csx) using System.Threading; using System.Threading.Tasks; void test(int n, string name) { for (int i = 1; i <= n; i++) { Console.WriteLine("{0}: {1}", i, name); Thread.Sleep(500); } }
関数 test は引数 name を 500 ミリ秒の間隔で n 回表示します。関数 Thread.Sleep は実行中のスレッドを指定した秒数だけ休止します。
Thread.Sleap(int msec) => void
引数の型が int の場合、時間の単位はミリ秒になります。それでは実行してみましょう。
$ dotnet-script > #load "task.csx" > var a = new Task(() => test(5, "foo")); > a Task(Id = 1, Status = Created, Method = "Void <<Initialize>>b__0_0()") { AsyncState=null, CancellationPending=false, CreationOptions=None, Exception=null, Id=1, Status=Created }
コンストラクタ Task でタスクを生成し、変数 a にセットします。test の引数は 2 つあるので、Task にはラムダ式を渡して、そこから test を呼び出すようにしています。変数 a を表示すると、Task にはいろいろなプロパティがあることがわかります。たとえば、Status はタスクの状態を表します。詳細は公式リファレンス Task クラス | Microsoft Docs をお読みください。
タスク a を RunSynchronously() で実行すると次のようになります。
> a.RunSynchronously(); Console.WriteLine("end"); 1: foo 2: foo 3: foo 4: foo 5: foo end > a Task(Id = 1, Status = RanToCompletion, Method = "{null}") { AsyncState=null, CancellationPending=false, CreationOptions=None, Exception=null, Id=1, Status=RanToCompletion }
タスク a の実行が終了してから、次の命令 WriteLine が実行されていることがわかります。Status もタスクが終了したことを表す値に書き換えられています。
次は Start() でタスクを実行してみましょう。次の例を見てください。
> var b = new Task(() => test(5, "bar")); > b.Start(); Console.WriteLine("end"); end > 1: bar 2: bar 3: bar 4: bar 5: bar
Start() のあと、すぐに次の命令 WriteLine が実行されていますね。タスク b は別スレッドで非同期に実行されていることがわかります。次は、複数のタスクを非同期で実行してみましょう。
> var a = new Task(() => test(4, "foo")); > var b = new Task(() => test(6, "bar")); > var c = new Task(() => test(8, "baz")); > a.Start(); b.Start(); c.RunSynchronously(); 1: bar 1: baz 1: foo 2: baz 2: foo 2: bar 3: bar 3: foo 3: baz 4: baz 4: bar 4: foo 5: bar 5: baz 6: bar 6: baz 7: baz 8: baz >
タスク a, b, c が非同期 (並列的) に動作していることがわかります。
Task にはタスクを生成してそのまま実行する関数 Run() も用意されています。
Task.Run(...) => Task object Task<R>.Run(...) => Task<R> object
Run() の引数はコンストラクタと同じです。Run() はコンストラクタと Start() をいっしょに行います。簡単な例を示しましょう。
> var d = Task.Run(() => test(5, "Foo")); Console.WriteLine("end"); end > 1: Foo 2: Foo 3: Foo 4: Foo 5: Foo >
値を返す場合は Task<R> を使います。結果はプロパティ Result で取得することができます。
リスト : Task<R> のサンプル (task.csx に追加) int test1(int n, string name) { for (int i = 1; i <= n; i++) { Console.WriteLine("{0}: {1}", i, name); Thread.Sleep(500); } return n; }
> #load "task.csx" > var a = Task.Run(() => test1(5, "foo")); var b = a.Result; Console.WriteLine("end"); 1: foo 2: foo 3: foo 4: foo 5: foo end > b 5
プロパティ Result にアクセスすると、そのタスクが終了するまで待たされることに注意してください。
メソッド Wait を使うと、そのタスクが終了するまで呼び出し元のスレッドは実行を休止します。
task_obj.Wait() => void
> var a = Task.Run(() => test(5, "foo")); a.Wait(); Console.WriteLine("end"); 1: foo 2: foo 3: foo 4: foo 5: foo end > var a = new Task(() => test(5, "foo")); > var b = new Task(() => test(6, "bar")); > a.Start(); b.Start(); a.Wait(); b.Wait(); Console.WriteLine("end"); 1: foo 1: bar 2: foo 2: bar 3: foo 3: bar 4: foo 4: bar 5: foo 5: bar 6: bar end
複数のタスクがすべて終了するのを待ちたい場合は関数 Task.WaitAll を使うと便利です。
Task.WaitAll(params Task[] tasks) => void
> var a = new Task(() => test(5, "foo")); > var b = new Task(() => test(6, "bar")); > a.Start(); b.Start(); Task.WaitAll(a, b); Console.WriteLine("end"); 1: foo 1: bar 2: foo 2: bar 3: bar 3: foo 4: foo 4: bar 5: bar 5: foo 6: bar end
複数のタスクを実行し、そのなかでどれか一つタスクが終了するのを待ちたいときは関数 Task.WaitAny を使うと便利です。
Task.WaitAny(Task[] tasks) => int
返り値は終了したタスクを示す配列 tasks の添字です。
> var xs = new Task[] {new Task(() => test(9, "foo")), new Task(() => test(7, "bar")), new Task(() => test(5, "baz"))}; > xs[0].Start(); xs[1].Start(); xs[2].Start(); var x = Task.WaitAny(xs); Console.WriteLine("end"); 1: foo 1: baz 1: bar 2: baz 2: foo 2: bar 3: foo 3: bar 3: baz 4: baz 4: bar 4: foo 5: foo 5: bar 5: baz 6: bar 6: foo end > 7: foo 7: bar 8: foo 9: foo > x 2
C# 5.0 から導入された async と await を使うと、もっと簡単にタスクを操作することができます。たとえば、関数 test と test1 を async と await を使って書き直すと次のようになります。
リスト : async と await のサンプル (task.csx に追加) async Task testAsync(int n, string name) { for (int i = 1; i <= n; i++) { Console.WriteLine("{0}: {1}", i, name); await Task.Delay(500); } } async Task<int> test1Async(int n, string name) { for (int i = 1; i <= n; i++) { Console.WriteLine("{0}: {1}", i, name); await Task.Delay(500); } return n; }
処理を別スレッドで非同期に実行する関数 (メソッド) を「非同期メソッド」といいます。C# の場合、非同期メソッドの名前は、末尾に Async を付ける慣習があります。非同期メソッドは async を使うと簡単に定義することができます。
async 返り値の型 メソッド名(引数, ...) { 処理; ... }
関数を定義するとき先頭にキーワード async を付けると、その関数は非同期メソッドになります。非同期メソッドを呼び出すと、Task.Run() と同じように別スレッドでそのメソッドを実行します。返り値の型は Task または Task<R> で、返り値が void の場合は Task、値を返す場合は Task<R> とします。返り値は return で返します。非同期メソッドでは、return の引数を自動的に Task<R> に包んで返します。
await は指定した非同期メソッドが終了するまで待機します。await の返り値は Task<R> の R です。Task の場合は void になります。つまり、次のような記述でメソッドの結果を取得することができます。
var result = await 非同期メソッド
Task.Delay は非同期メソッドで、引数で指定した時間が経過したら終了するタスクを生成します。await と組み合わせることで、指定した時間だけ呼び出し元のタスクを休止することができます。引数の型が int の場合、時間はミリ秒になります。
なお、async で定義された非同期メソッドにおいて、await がないとそのメソッドは逐次的に実行されます。ご注意くださいませ。
簡単な実行例を示します。
> testAsync(5, "foo"); Console.WriteLine("end"); 1: foo end > 2: foo 3: foo 4: foo 5: foo > await testAsync(5, "bar"); Console.WriteLine("end"); 1: bar 2: bar 3: bar 4: bar 5: bar end > var x = await test1Async(7, "baz"); Console.WriteLine("end"); 1: baz 2: baz 3: baz 4: baz 5: baz 6: baz 7: baz end > x 7
最初の例のように、testAsync を呼び出すだけで、その処理は非同期に実行されます。次の例のように await を使うと、testAsync が終了するまで待たされることがわかります。最後の例は結果を取得する場合です。await で test1Async の結果を取り出して変数 x にセットします。処理が完了すると、x の値は 7 になります。
もちろん、Task.WaitAll や Task.WaitAny を使って、非同期メソッドが終了するまで待機することもできます。
> var a = testAsync(5, "foo"); var b = testAsync(7, "bar"); Task.WaitAll(a, b); Console.WriteLine("end"); 1: foo 1: bar 2: bar 2: foo 3: foo 3: bar 4: bar 4: foo 5: foo 5: bar 6: bar 7: bar end > var a = testAsync(5, "foo"); var b = testAsync(7, "bar"); Task.WaitAny(a, b); Console.WriteLine("end"); 1: foo 1: bar 2: bar 2: foo 3: foo 3: bar 4: bar 4: foo 5: foo 5: bar 6: bar end > 7: bar
WhenAll と WhenAny は WaitAll と WaitAny と似ていますが、When の方はタスクを返すところが異なります。
Task.WhenAll(Task[] tasks) => Task Task<R>.WhenAll(Task<R>[] tasks) => Task<R[]> Task.WhenAny(Task[] tasks) => Task Task<R>.WhenAny(Task<R>[] tasks) => Task<Task<R>>
WhenAll が返すタスクは、引数 tasks のタスクがすべて終了したときに終了します。WhenAny が返すタスクは、引数 tasks のタスクの中でどれか一つ終了したときに終了します。これらの関数は await と組み合わせて使うと大変便利です。Task<R> の場合、WhenAll はタスクの結果を格納した配列 R[] を返します。WhenAny は終了したタスク Task<R> を返します。
簡単な例を示しましょう。
> var xs = new Task<int>[] {test1Async(5, "foo"), test1Async(7, "bar")}; var ys = await Task.WhenAll(xs); 1: foo 1: bar 2: bar 2: foo 3: foo 3: bar 4: bar 4: foo 5: foo 5: bar 6: bar 7: bar > ys int[2] { 5, 7 }
> var xs = new Task<int>[] {test1Async(5, "foo"), test1Async(7, "bar")}; var ys = await Task.WhenAny(xs); 1: foo 1: bar 2: bar 2: foo 3: foo 3: bar 4: bar 4: foo 5: foo 5: bar 6: bar > 7: bar ys AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<test1Async> (Id = 1, Status = RanToCompletion, Method = "{null}", Result = "5")
もう一つ簡単な例を示しましょう。フィボナッチ数列を二重再帰で求める関数 fibo を定義します。複数の値を逐次処理と並列処理で計算し、その実行時間を計測してみましょう。プログラムは次のようになります。
リスト : フィボナッチ関数 (逐次処理) int fibo(int n) { if (n < 2) { return n; } else { return fibo(n - 2) + fibo(n - 1); } } void testfibo() { var xs = new int[]{39, 40, 41, 42}; var ys = new int[4]; DateTime s = DateTime.Now; for (int i = 0; i < 4; i++) { ys[i] = fibo(xs[i]); } DateTime e = DateTime.Now; Console.WriteLine("{0}, {1}, {2}, {3}", ys[0], ys[1], ys[2], ys[3]); Console.WriteLine("{0}", e - s); }
関数 fibo は二重再帰でフィボナッチ数列を求めます。実行時間はとても遅いです。関数 testfibo は 39 から 42 までのフィボナッチ数を順番に求めて配列 ys にセットし、その実行時間を求めます。このプログラムを並列処理に変更すると、次のようになります。
リスト : フィボナッチ関数 (並列処理) async Task<int> fiboAsync(int n) { var x = await Task<int>.Run(() => fibo(n)); return x; } async Task testfibo1() { var xs = new int[]{39, 40, 41, 42}; DateTime s = DateTime.Now; var ys = new Task<int>[] { fiboAsync(xs[0]), fiboAsync(xs[1]), fiboAsync(xs[2]), fiboAsync(xs[3]) }; var zs = await Task.WhenAll(ys); DateTime e = DateTime.Now; Console.WriteLine("{0}, {1}, {2}, {3}", zs[0], zs[1], zs[2], zs[3]); Console.WriteLine("{0}", e - s); }
関数 fiboAsync は非同期で関数 fibo を呼び出します。fibo をそのまま呼び出すと逐次的に実行されるので、Task.Run で強制的に別スレッドで fibo を実行し、それが終了するまで await で待機します。関数 testfibo1 は配列 ys に生成したタスクをセットし、それらがすべて終了するまで await Task.WhenAll(ys) で待機します。これで複数のフィボナッチ数を並列に求めることができます。
それでは実行してみましょう。
> testfibo(); 63245986, 102334155, 165580141, 267914296 00:00:08.4605908 > await testfibo1(); 63245986, 102334155, 165580141, 267914296 00:00:05.2303030
M.Hiroi のパソコン (CPU, Intel Core i5-6200U 2.30GHz) は物理コア数が 2 で、1 コアにつきハイパースレッディングで 2 分割することができるので、コア数の上限は 2 * 2 = 4 になります。これを「論理コア数」と呼ぶことがあります。並列処理のほうが約 1.6 倍速くなったので、並列処理の効果は十分に出ていると思います。
なお、このような並列処理はクラス Parallel を使ったほうが簡単にプログラムすることができます。Parallel には並列処理を行うための関数が用意されています。基本的な関数を以下に示します。
1. Parallel.For(start, end, var => { ... }) 2. Parallel.ForEach(collection, var => { ... }) 3. Parallel.Invoke(params Action[] actions)
1 は for 文を、2 は foreach 文を、3 は引数の Action を (可能であれば) 並列に処理するものです。簡単な例を示しましょう。
> Parallel.For(0, 3, i => test(5, xs[i])); 1: foo 1: baz 1: bar 2: bar 2: baz 2: foo 3: baz 3: bar 3: foo 4: foo 4: baz 4: bar 5: bar 5: foo 5: baz > Parallel.ForEach(xs, n => test(5, n)); 1: foo 1: bar 1: baz 2: foo 2: baz 2: bar 3: baz 3: foo 3: bar 4: foo 4: bar 4: baz 5: baz 5: bar 5: foo > Parallel.Invoke(() => test(5, "foo"), () => test(5, "bar"), () => test(5, "baz")); 1: baz 1: bar 1: foo 2: bar 2: foo 2: baz 3: foo 3: baz 3: bar 4: foo 4: bar 4: baz 5: foo 5: bar 5: baz
どの方法でも 3 つの test が並列に実行されていることがわかります。次はフィボナッチ関数で試してみましょう。
リスト : クラス Prallel による並列処理 void testfibo2() { var xs = new int[]{39, 40, 41, 42}; var ys = new int[4]; DateTime s = DateTime.Now; Parallel.For(0, 4, i => { ys[i] = fibo(xs[i]); }); DateTime e = DateTime.Now; Console.WriteLine("{0}, {1}, {2}, {3}", ys[0], ys[1], ys[2], ys[3]); Console.WriteLine("{0}", e - s); }
プログラムは簡単なので説明は割愛させていただきます。実行結果を示します。
> testfibo2(); 63245986, 102334155, 165580141, 267914296 00:00:05.2620611
Parallel.For による並列処理の効果は十分に出ていると思います。
次は数値積分で円周率 \(\pi\) を求めてみましょう。区間 [a, b] の定積分 \(\int_{a}^{b} f(x) dx\) を数値的に求めるには、区間を細分して小区間の面積を求めて足し上げます。小区間の面積を求める一番簡単な方法は長方形で近似することです。この場合、3 つの方法が考えられます。
1 は左端の値 f(a) を、2 は右端の値 f(b) を、3 は中間点の値 f((a + b) / 2) を使って長方形の面積を計算します。この中で 3 番目の方法が一番精度が高く、これを「中点則」といいます。このほかに、台形で近似する「台形則」や、2 次近似で精度を上げる「シンプソン則」という方法があります。
それでは実際に、1 の方法と中点則でπの値を求めてみましょう。πは次の式で求めることができます。
プログラムは次のようになります。
リスト : 数値積分で円周率を求める // 左端 double leftPoint(int n) { double w = 1.0 / n; double s = 0.0; for (int i = 0; i < n; i++) { double x = i * w; s += 4.0 / (1.0 + x * x); } return s * w; } // 中点則 double midPoint(int n) { double w = 1.0 / n; double s = 0.0; for (int i = 1; i <= n; i++) { double x = (i - 0.5) * w; s += 4.0 / (1.0 + x * x); } return s * w; } // テスト void testPi() { int n = 10; for (int i = 1; i < 10; i++) { double p1 = midPoint(n); double p2 = leftPoint(n); Console.WriteLine("----- {0} -----", n); Console.WriteLine("{0:f16}, {1:e16}", p1, System.Math.PI - p1); Console.WriteLine("{0:f16}, {1:e16}", p2, System.Math.PI - p2); n *= 10; } }
関数 leftPoint は 1 の方法で、関数 midPoint が中点則でπの値を求めます。引数 n が分割数です。最初に小区間の幅を求めて変数 w にセットします。面積は変数 s にセットします。あとは for ループで区間 [0, 1] を n 個に分割して面積を求めます。
最初に x 座標を計算します。leftPoint は i * w でいいのですが、midPoint は中間点を求めるため、変数 i を 1 から始めて、x 座標を次の式で求めます。
double x = (i - 0.5) * w;
たとえば、変数 i が 1 の場合は 0.5 になるので、x は区間 [0 * w, 1 * w] の中間点になります。あとは、4 / (1 + x * x) を計算して s に加算します。最後に s に w を掛け算して全体の面積を求めます。
実行結果を示します。
> testPi(); ----- 10 ----- 3.1424259850010987, -8.3333141130559341e-004 3.2399259889071588, -9.8333335317365655e-002 ----- 100 ----- 3.1416009869231254, -8.3333333322777037e-006 3.1515759869231288, -9.9833333333356755e-003 ----- 1000 ----- 3.1415927369231227, -8.3333329570223214e-008 3.1425924869231245, -9.9983333333142355e-004 ----- 10000 ----- 3.1415926544231341, -8.3334095180020995e-010 3.1416926519231398, -9.9998333346729140e-005 ----- 100000 ----- 3.1415926535981615, -8.3684170704145799e-012 3.1416026535731527, -9.9999833595987297e-006 ----- 1000000 ----- 3.1415926535897643, 2.8865798640254070e-014 3.1415936535897928, -9.9999999969568876e-007 ----- 10000000 ----- 3.1415926535897309, 6.2172489379008766e-014 3.1415927535899870, -1.0000019390332682e-007 ----- 100000000 ----- 3.1415926535904264, -6.3327121324618929e-013 3.1415926635902252, -1.0000432038026474e-008 ----- 1000000000 ----- 3.1415926535899708, -1.7763568394002505e-013 3.1415926545896586, -9.9986552370978643e-010
中点則の場合、分割数を 10 倍すると誤差はほぼ 1 / 100 になります。それに対し、1 の方法は分割数を 10 倍しても誤差は 1 / 10 にしかなりません。このように、1 の方法は分割数を増やさないと精度の高い値を求めることができません。
ただし、浮動小数点数 (double) の計算には誤差があるので、精度には限界があります。中点則の場合、分割数を 1000000 より増やしても精度は高くなりません。1 の方法は分割数を増やすと誤差は少なくなりますが、実行時間がかかるようになります。そこで、並列処理を使って実行時間を短縮してみましょう。
並列化の考え方は簡単です。たとえば、区間を [0, 0.25], [0.25, 0.5], [0.5, 0.75], [0.75, 1] のように四等分して、それぞれの区間を並列に計算します。あとは、その値の足し算すればいいわけです。
プログラムは次のようになります。
リスト : 数値積分で円周率を求める (並列化) const int N = 1000000000; const double W = 1.0 / N; double leftPointPara(int n, int m) { double s = 0.0; for (int i = n; i < m; i++) { double x = i * W; s += 4.0 / (1.0 + x * x); } return s * W; } void testPara() { var result = new double[4]; double a = 0.0; Console.WriteLine("----- 逐次処理 -----"); DateTime s = DateTime.Now; result[0] = leftPointPara(0, N); DateTime e = DateTime.Now; Console.WriteLine("{0:f16}", result[0]); Console.WriteLine("{0}", e - s); Console.WriteLine("----- 二分割 -----"); s = DateTime.Now; Parallel.For(0, 2, i => {result[i] = leftPointPara(N / 2 * i, N / 2 * (i + 1));}); e = DateTime.Now; a = 0.0; for (int i = 0; i < 2; i++) { a += result[i]; Console.WriteLine("{0:f16}", result[i]); } Console.WriteLine("{0:f16}", a); Console.WriteLine("{0}", e - s); Console.WriteLine("----- 四分割 -----"); s = DateTime.Now; Parallel.For(0, 4, i => {result[i] = leftPointPara(N / 4 * i, N / 4 * (i + 1));}); e = DateTime.Now; a = 0.0; for (int i = 0; i < 4; i++) { a += result[i]; Console.WriteLine("{0:f16}", result[i]); } Console.WriteLine("{0:f16}", a); Console.WriteLine("{0}", e - s); }
定数 N が分割数で、W が小区間の幅を表します。関数 leftPointPara の引数 n, m は区間を表します。あとは、Parallel.For で区間を分割して leftPointPara を呼び出すだけです。
それでは実行結果を示します。
> testPara(); ----- 逐次処理 ----- 3.1415926545896586 00:00:03.5524553 ----- 二分割 ----- 1.8545904364032233 1.2870022181865426 3.1415926545897657 00:00:02.1202034 ----- 四分割 ----- 0.9799146526251512 0.8746757837781460 0.7194139994899991 0.5675882186966341 3.1415926545899300 00:00:01.7536681
4 分割で約 2 倍の高速化に成功しました。並列化の効果は十分に出ていると思います。
Parallel.For と Parallel.ForEach は、クラス ParallelLoopState を使ってループを停止したり途中で中断することができます。
1. Parallel.For(start, end, (var, state) => { ... }) => ParallelLoopResult 2. Parallel.ForEach(collection, (var, state) => { ... }) => ParallelLoopResult
For と ForEach は構造体 ParallelLoopResult を返します。ParallelLoopState のインスタンスは For と ForEach が生成し、それをラムダ式の引数 state に渡します。私たちが生成することはできません。ご注意くださいませ。
ループの停止はメソッド Stop を、中断はメソッド Break を使います。
state.Stop() state.Break()
Stop() が実行されると、そのスレッドだけではなく並列ループで実行しているすべてのスレッドが停止します。正確に言うと、スレッドで実行中の処理が終了したあと、次の処理 (繰り返し) に進まないでスレッドを終了する、という動作になります。Break() が実行されると、それを実行したときのインデックスまでが繰り返しの範囲となります。Break() が複数回実行された場合、その中で一番小さなインデックスが繰り返しの範囲となります。
並列ループが正常に終了した場合、ParallelLoopResult のプロパティ IsCompletedStop は true になります。Stop() または Break() が実行された場合、IsCompleted は false になります。Break() が実行された場合、プロパティ LowestBreakIteration に最小のインデックスがセットされます。Break() を実行していない場合は null を返します。
簡単な例を示しましょう。次のプログラムを見てください。
リスト : 並列ループの停止と中断 using System.Threading; using System.Threading.Tasks; // 正常終了 ParallelLoopResult test() { return Parallel.For(0, 20, i => { Console.WriteLine("start {0}", i); Thread.Sleep(500); Console.WriteLine("end {0}", i); }); } // Stop する場合 ParallelLoopResult test1(int stopIdx) { return Parallel.For(0, 20, (i, state) => { Console.WriteLine("start {0}", i); if (stopIdx == i) { Console.WriteLine("stop {0}", i); state.Stop(); } else { Thread.Sleep(500); Console.WriteLine("end {0}", i); } }); } // Break する場合 ParallelLoopResult test2(int brkIdx) { return Parallel.For(0, 20, (i, state) => { Console.WriteLine("start {0}", i); if (brkIdx <= i) { Console.WriteLine("break {0}", i); state.Break(); } else { Thread.Sleep(500); Console.WriteLine("end {0}", i); } }); }
関数 test は並列ループが正常に終了する場合です。test1 は引数 stopIdx と等しいインデックス (i) が実行されたとき Stop を実行します。test2 はインデックス (i) が引数 brkIdx 以上であれば Break を実行します。
以下に実行結果を示します。
> var a = test(); start 0 start 5 start 10 start 15 start 1 start 6 end 15 end 1 end 10 start 2 start 11 end 0 start 4 end 6 start 7 end 5 start 9 start 16 end 16 end 4 end 2 start 3 start 17 end 7 start 8 end 11 start 12 start 13 end 9 start 18 end 18 end 12 start 19 end 17 end 3 end 13 start 14 end 8 end 14 end 19 > a.IsCompleted true
test1 を実行すると、0 から 19 までのインデックスで start と end が表示されます。IsCompleted も true を返すので、並列ループは正常に実行されたことがわかります。
次は test1(8) を実行します。
> var b = test1(8); start 0 start 5 start 10 start 15 start 1 end 10 end 5 end 1 end 15 start 16 start 11 start 6 start 2 end 0 start 4 end 4 start 8 end 2 start 3 end 6 start 7 end 16 start 17 stop 8 end 11 end 17 end 3 end 7 > b.IsCompleted false > b.LowestBreakIteration.HasValue false
インデックスが 8 のときに Stop が実行されます。その後、新しい処理は開始されずに、実行中の処理が終了したら For ループが終了します。IsCompleted の値は false になり、LowestBreakIteration の値は null になるので、Stop が実行されたことがわかります。
最後に test2(8) を実行します。
> var c = test2(8); start 0 start 10 start 15 break 10 break 15 start 1 start 5 start 6 end 6 end 1 start 7 end 5 end 0 start 4 start 9 break 9 start 2 end 2 end 7 start 8 break 8 end 4 start 3 end 3 > c.IsCompleted false > c.LowestBreakIteration.HasValue true > c.LowestBreakIteration 8
Break が最初に実行されるのは 10 番目の処理なので、最小のインデックス値は 10 になります。その前にインデックス 15 の処理が開始されているので、それはそのまま実行されます。このとき、Break も実行されますが、10 < 15 なので最小のインデックス値は更新されません。
Break が実行されると、最小のインデックス値よりも小さなインデックスの処理が実行されるので、1, 5, 7, 6, 4, 9 と並列に処理が実行されます。9 で最小のインデックス値は 10 から 9 に更新され、さらに 8 番目の処理で 8 に更新されます。あとは、8 以下のインデックスで処理されていない 3 番目の処理を実行して並列ループを終了します。終了後、IsCompleted の値は false になり、LowestBreakIteration の値は 8 になるので、Break が実行されたことがわかります。
なお、並列ループの処理の途中で、他のスレッドが Stop や Break の実行したことを検出することもできます。ParallelLoopState のプロパティを使います。
state.IsStopped => bool state.LowestBreakIteration => long?
Stop が実行されると、IsStopped は true になります。Break が実行されると、LowestBreakIteration はその時点での最小のインデックス値になります。