M.Hiroi's Home Page

F# Programming

お気楽 F# プログラミング超入門

[ PrevPage | F# | NextPage ]

並行プログラミング

今回は F# の「非同期式 (async 式)」を使って、簡単な並行プログラミングに挑戦してみましょう。

●並行プログラミングとは?

「並行 (concurrent) プログラミング」は複数のプログラムを並行に実行しますが、このとき複数の CPU で同時に動かす場合と、一つの CPU で複数のプログラムを動かす場合があります。一般的には、前者を「並列 (parallel) プログラミング」といい、複数のハードウェアを並列に実行することによる処理速度の向上が主な目的となります。

後者の場合、一定時間毎に実行するプログラムを切り替えることで、複数のプログラムを並列に実行しているように見せることができます。この処理を「時分割 (time sharing)」もしくは「タイム・スライス (time slice)」といいます。一般に、タイム・スライスは OS でサポートされている機能です。OS が実行するプログラムのことを「プロセス (process)」または「タスク (task)」といいます。

並列的に動作するプログラムをひとつのプロセスだけで作るのはけっこう大変です。そこで、プロセス内では逐次的な処理にとどめ、複数のプロセス間で情報交換を行うことにより、全体で並列的な動作を実現することを考えます。このほうが自然にプログラムを記述できる場合があるのです。これが後者の主な目的となります。

プロセスは互いに独立したプログラムですが、OS にはプロセス間でデータをやり取りする機能 (プロセス間通信) が用意されています。たとえば、UNIX ライクな OS では「パイプ (pipe)」を使って複数のプログラム (コマンド) を連結することができます。この場合、パイプを通してデータがプログラムに送られ、各プログラムは並行に動作することになります。入出力処理で待たされるコマンドがあったとしても、その間に他のコマンドが実行されるので、各コマンドを逐次的に実行するよりも、効率的に処理することが可能です。

最近では、ひとつのプログラムの中で独立した複数の処理を記述できるようになりました。この機能を「スレッド (thread)」とか「マルチスレッド」いいます。スレッドは「縫い糸」という意味ですが、プログラムでは「制御の流れ」という意味で使われています。並列的な動作をプログラムする場合、逐次的な処理をひとつのスレッドに割り当て、複数のスレッドを並行に動作させることにより、全体で並列的な動作を実現するわけです。

一般に、スレッドは一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。これに対し、Ruby のファイバーや Lua のコルーチンは、プログラムの実行を一定時間毎に切り替えるものではありません。他のプログラムが実行できるよう自主的に処理を中断する、といった協調的な動作を行わせることで、複数のプログラムを並行に動作させています。これを「ノンプリエンプティブ (nonpreemptive)」といいます。

スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを「共有メモリ」といいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、プリエンプティブなスレッドの場合、共有メモリのアクセス時に発生する「競合」が問題になります。このため、プリエンプティブなマルチスレッドをサポートしているプログラミング言語では、競合を回避するための仕組みが用意されています。

複数のスレッドを協調的に動作させる場合、スレッドの待ち合わせが必要になることがあります。この処理を「同期 (synchronization)」といいます。逆に、複数のスレッドを協調させずに動作させることを「非同期 (asynchronous)」といいます。.NET (C#) の場合、マルチスレッドはクラス Thread を使ってプログラムすることができます。.NET Framework 4.5 からはより使いやすいタスク (Task) が、さらに C# 5.0 からは async, await という機能が導入されました。

F# の場合、非同期の処理は「非同期式 (async 式)」を使ってプログラムします。また、「タスク式 (task 式)」を使うと、.NET (C#) のタスクを操作することもできます。

●非同期式 (async 式)

F# で並行プログラミングを行う場合、「非同期式 (async 式)」を使うと便利です。

async { expression; ... }

async 式は式 expreesion を非同期で実行するように設定します。async 式はクラス Async<'T> のインスタンスを返します。'T はコンピュテーション式のキーワード retrun によって返される値の型です。

async 式を評価しただけでは、中の式は実行されません。expression を実行するには、モジュール Async に用意されている関数を使います。基本的な関数を以下に示します。

1. Async.RunSynchronously async-expr
2. Async.StartImmediate async-expr
3. Async.Start async-expr

1 は現在のスレッドで async-expr (Async<'T>) を実行し、それが終了するまで待機します。終了したら Async<'T> から 'T を取り出して返します。2 は async-expr (Async<unit>) を非同期で実行します。async-expr が終了するまで待機しません。すぐに次の処理が実行されます。3 はスレッドプールで async-expr (Async<unit>) を実行します。2 と同じく async-expr が終了するまで待機しません。

スレッドプールはスレッドを効率よく使い回すための仕組みです。あらかじめ複数のスレッドを生成し、それをプールしておきます。async 式を実行するとき、空いているスレッドを探して非同期式に割り当てます。空きスレッドが無い場合、スレッドが空くまで待つことになります。このスケジューリングは F# が行います。StartImmediate と違って、Start は async 式をすぐに実行するとは限らないことに注意してください。

async の中ではキーワード let! と do! を使って他の async 式を実行することができます。

let! 変数名 = async-expr
do! async-expr

let! は右辺 async-expr (Async<'T>) を実行し、それが終了するまで待機します。終了したら左辺の変数に 'T をセットします。do! は async-expr (Async<unit>) を実行し、それが終了するまで待機します。

簡単な例を示しましょう。0.5 秒間隔で name を n 回表示するプログラムを作ります。

リスト : 0.5 秒間隔で Name を N 回表示する

let test n name = async {
  for i = 1 to n do
    printfn "%d: %s" i name
    do! Async.Sleep 500
}

モジュール Async の関数 Sleep はプログラムの実行を指定した時間だけ休止する async 式 (Async<unit>) を返します。

Async.Sleep msec => Async<unit>

引数の型が int のときはミリ秒 (msec) を表します。do! と組み合わせることで、Sleep を呼び出した async 式の実行を指定した時間だけ休止することができます。

test を RunSynchronously で実行すると次のようになります。

> let test n name = async {
-   for i = 1 to n do
-     printfn "%d: %s" i name
-     do! Async.Sleep 500
- };;
val test: n: int -> name: string -> Async<unit>

> test 5 "foo" |> Async.RunSynchronously; printfn "end";;
1: foo
2: foo
3: foo
4: foo
5: foo
end
val it: unit = ()

test の実行が終了してから、次の命令 printfn が実行されていることがわかります。次は、StartImmediate と Start で実行してみましょう。

> test 5 "foo" |> Async.StartImmediate; printfn "end";;
1: foo
end
val it: unit = ()

> 2: foo
3: foo
4: foo
5: foo

> test 5 "foo" |> Async.Start; printfn "end";;
1: foo
end
val it: unit = ()

> 2: foo
3: foo
4: foo
5: foo

どちらの関数でも、test を実行したあと、すぐに次の命令 printfn が実行されていますね。test は別スレッドで非同期に実行されていることがわかります。次は、複数のタスクを非同期で実行してみましょう。

> test 5 "foo" |> Async.Start; test 6 "bar" |> Async.Start; test 7 "baz" |>
Async.RunSynchronously; printfn "end";;
1: foo
1: baz
1: bar
2: bar
2: foo2:
baz
33: bar:
baz
3: foo
4: baz4: bar

4: foo
5: bar
5: baz
5: foo
6: baz
6: bar
7: baz
end
val it: unit = ()

表示は乱れますが、3 つの async 式が非同期 (並列的) に動作していることがわかります。

async 式で値を返す場合はキーワード return を使います。値を返すように関数 test を修正します。

リスト : 0.5 秒間隔で Name を N 回表示する

let test1 n name = async {
  for i = 1 to n do
    printfn "%d: %s" i name
    do! Async.Sleep 500
  return n
}

async 式の最後に return n を追加するだけです。これで関数 test1 は値を返す async 式 (Async<int>) を生成することができます。return! というキーワードもあります。これは let! で値を求め、それを return で返すことと同じ働きをします。

それでは実際に試してみましょう。

> let test1 n name = async {
-   for i = 1 to n do
-     printfn "%d: %s" i name
-     do! Async.Sleep 500
-   return n
- };;
val test1: n: int -> name: string -> Async<int>

> test1 7 "foo" |> Async.RunSynchronously;;
1: foo
2: foo
3: foo
4: foo
5: foo
6: foo
7: foo
val it: int = 7

●async 式と再帰呼び出し

async 式は再帰呼び出しを行うことができます。test と test1 を再帰定義で書き直すと次のようになります。

リスト : async 式と再帰定義

let testRec n name = 
  let rec iter i = async {
    if i > n then return ()
    else
      printfn "%d: %s" i name
      do! Async.Sleep 500
      return! iter (i + 1)
  }
  iter 1

let testRec1 n name = 
  let rec iter i = async {
    if i > n then return n
    else
      printfn "%d: %s" i name
      do! Async.Sleep 500
      return! iter (i + 1)
  }
  iter 1

局所関数 iter は return! を使うことで「末尾再帰」になります。この場合、末尾再帰最適化が行われるため、効率的に動作します。簡単な実行例を示しましょう。

> let testRec n name =
-   let rec iter i = async {
-     if i > n then return ()
-     else
-       printfn "%d: %s" i name
-       do! Async.Sleep 500
-       return! iter (i + 1)
-   }
-   iter 1;;
val testRec: n: int -> name: string -> Async<unit>

> testRec 7 "foo" |> Async.RunSynchronously;;
1: foo
2: foo
3: foo
4: foo
5: foo
6: foo
7: foo
val it: unit = ()

> let testRec1 n name =
-   let rec iter i = async {
-     if i > n then return n
-     else
-       printfn "%d: %s" i name
-       do! Async.Sleep 500
-       return! iter (i + 1)
-   }
-   iter 1;;
val testRec1: n: int -> name: string -> Async<int>

> testRec1 7 "bar" |> Async.RunSynchronously;;
1: bar
2: bar
3: bar
4: bar
5: bar
6: bar
7: bar
val it: int = 7

二重再帰の場合、async 式でもプログラムは可能ですが、実行速度は通常の関数よりも極端に遅くなるようです。フィボナッチ関数で試してみましょう。

リスト : フィボナッチ関数 (二重再帰)

let rec fibo n =
  if n < 2 then n
  else fibo(n - 1) + fibo(n - 2)

let rec fiboRec n = async {
  if n < 2 then return n
  else
    let! a = fiboRec (n - 1)
    let! b = fiboRec (n - 2)
    return (a + b)
}
> let rec fibo n =
-   if n < 2 then n
-   else fibo(n - 1) + fibo(n - 2);;
val fibo: n: int -> int

> let rec fiboRec n = async {
-   if n < 2 then return n
-   else
-     let! a = fiboRec (n - 1)
-     let! b = fiboRec (n - 2)
-     return (a + b)
- };;
val fiboRec: n: int -> Async<int>

> #time;;

--> 今すぐタイミング オン

> fibo 35;;
リアル: 00:00:00.067、CPU: 00:00:00.080、GC 全般0: 0, 全般1: 0, 全般2: 0
val it: int = 9227465

> fiboRec 35 |> Async.RunSynchronously;;
リアル: 00:00:07.606、CPU: 00:00:07.620、GC 全般0: 5495, 全般1: 3, 全般2: 1
val it: int = 9227465

fiboRec の場合、let! で新たなスレッドが生成され、それが終了するのを待つという動作になります。多数のスレッドが生成されるため、実行時間が極端に遅くなるのだと思います。async 式で再帰を使うときにはご注意くださいませ。

●逐次処理と並列処理

複数の async 式を順番に実行するときは関数 Async.Sequential を使うと便利です。

Async.Sequential async-expers => Async<'T[]>
async-exprs : seq<Async<'T>>

引数 async-exprs は async 式を格納したシーケンスです。リストや配列でもかまいません。Sequential はシーケンスに格納された async 式を順番に実行するように設定した async 式を返します。async 式の実行が終了すると、次の async 式を実行します。複数の async 式が並行に実行されるわけではないことに注意してください。

async 式の実行には Async.RunSynchronously を使います。次の例を見てください。

> let a = ["foo"; "bar"; "baz"; "oops"];;
val a: string list = ["foo"; "bar"; "baz"; "oops"]

> let b = a |> List.map (fun x -> test 5 x) |> Async.Sequential;;
val b: Async<unit[]>

> b |> Async.RunSynchronously;;
1: foo
2: foo
3: foo
4: foo
5: foo
1: bar
2: bar
3: bar
4: bar
5: bar
1: baz
2: baz
3: baz
4: baz
5: baz
1: oops
2: oops
3: oops
4: oops
5: oops
val it: unit[] = [|(); (); (); ()|]

変数 a に名前のリストをセットします。List.map で async 式を格納したリストを作り、それを Async.Sequential に渡します。これで test を逐次実行する async 式を生成することができます。あとは Async.RunSynchronously に渡して実行するだけです。実行結果をみると、リストの先頭要素から順番に実行されていることがわかります。

複数の async 式を並行に実行したい場合は Async.Parallel を使います。

Async.Prallel async-expers => Async<'T[]>
async-exprs : seq<Async<'T>>

Parallel はシーケンスに格納された async 式を並行に実行するように設定した async 式を返します。マルチコア CPU で実行すると、async 式は「並列」に動作します。次の例を見てください。

> let c = a |> List.map (fun x -> test 5 x) |> Async.Parallel;;
val c: Async<unit[]>

> c |> Async.RunSynchronously;;
1: bar1: baz
1: foo

1: oops
2: oops2: 2: baz

foo
2: bar
3: bar
3: baz
3: oops
3: foo
44: 4: foo
baz
4: bar
: oops
5: bar5: 5: foo

baz
5: oops
val it: unit[] = [|(); (); (); ()|]

表示は乱れていますが、test が並行 (並列) に実行されていることがわかります。

もう一つ簡単な例を示しましょう。フィボナッチ数列を二重再帰で求める関数 fibo を定義します。複数の値を Sequential と Prallel で計算し、その実行時間を計測してみましょう。

> let rec fibo n =
-   if n < 2 then n
-   else fibo (n - 1) + fibo (n - 2);;
val fibo: n: int -> int

> let xs = [39..42] |> List.map (fun n -> async { return (fibo n) }) |> Async.Sequential;;
val xs: Async<int[]>

> let ys = [39..42] |> List.map (fun n -> async { return (fibo n) }) |> Async.Parallel;;
val ys: Async<int[]>

> #time;;

 -> 今すぐタイミング オン

> xs |> Async.RunSynchronously;;
リアル: 00:00:04.519、CPU: 00:00:04.540、GC 全般0: 0, 全般1: 0, 全般2: 0
val it: int[] = [|63245986; 102334155; 165580141; 267914296|]

> ys |> Async.RunSynchronously;;
リアル: 00:00:02.850、CPU: 00:00:07.410、GC 全般0: 0, 全般1: 0, 全般2: 0
val it: int[] = [|63245986; 102334155; 165580141; 267914296|]

M.Hiroi のパソコン (CPU, Intel Core i5-6200U 2.30GHz) は物理コア数が 2 で、1 コアにつきハイパースレッディングで 2 分割することができるので、コア数の上限は 2 * 2 = 4 になります。これを「論理コア数」と呼ぶことがあります。Parallel のほうが約 1.6 倍速くなったので、並列処理の効果は十分に出ていると思います。

複数の async 式の値をすべて求めるのではなく、計算に成功した値がひとつあれば十分なこともあります。この場合、関数 Async.Choice を使うと便利です。

Async.Choice async-exprs => Async<'T option>
async-exprs : seq<Async<'T option>>

Choice は複数の async 式を並列に実行し、最初に計算が成功した値を返すよう設定した async 式を生成します。個々の async 式の型は Async<'T option> です。すべての計算が失敗した場合、Choice の返り値は Async<None> になります。

簡単な例を示しましょう。

> let zs = [41; 40; 39] |> List.map (fun n -> async { return Some (fibo n) }) |> Async.Choice;;
val zs: Async<int option>

> zs |> Async.RunSynchronously;;
val it: int option = Some 63245986

fibo 41, fibo 40, fibo 39 を並列に実行します。この場合、一番最初に計算終了した値が求まります。結果は fibo 39 の値 63245986 になりました。

●数値積分

次は数値積分で円周率πを求めてみましょう。区間 [a, b] の定積分 \(\int_{a}^{b} f(x) \, dx\) を数値的に求めるには、区間を細分して小区間の面積を求めて足し上げます。小区間の面積を求める一番簡単な方法は長方形で近似することです。この場合、3 つの方法が考えられます。

  1. (b - a) * f(a)
  2. (b - a) * f(b)
  3. (b - a) * f((a + b) / 2)

1 は左端の値 f(a) を、2 は右端の値 f(b) を、3 は中間点の値 f((a + b) / 2) を使って長方形の面積を計算します。この中で 3 番目の方法が一番精度が高く、これを「中点則」といいます。このほかに、台形で近似する「台形則」や、2 次近似で精度を上げる「シンプソン則」という方法があります。

それでは実際に、1 の方法と中点則で \(\pi\) の値を求めてみましょう。\(\pi\) は次の式で求めることができます。

\( \pi = \displaystyle \int_{a}^{b} \dfrac{4}{1 + x^2} \, dx \)

プログラムは次のようになります。

リスト : 数値積分で円周率を求める

// 左端
let leftPoint n = 
  let w = 1.0 / (float n)
  let rec iter i s =
    if i >= n then s * w
    else
      let x = w * (float i)
      iter (i + 1) (s + 4.0 / (1.0 + x * x))
  iter 0 0.0

// 中点則
let midPoint n =
  let w = 1.0 / (float n)
  let rec iter i s =
    if i > n then s * w
    else
      let x = ((float i) - 0.5) * w
      iter (i + 1) (s + 4.0 / (1.0 + x * x))
  iter 1 0.0

// テスト
let test () =
  let rec iter i n =
    if i > 9 then ()
    else
      let p1 = midPoint n
      let p2 = leftPoint n
      printfn "----- %d -----" n
      printfn "%.16f, %.16e" p1 (System.Math.PI - p1)
      printfn "%.16f, %.16e" p2 (System.Math.PI - p2)
      iter (i + 1) (n * 10)
  iter 1 10

関数 leftPoint は 1 の方法で、関数 midPoint が中点則でπの値を求めます。引数 n が分割数です。最初に小区間の幅を求めて変数 w にセットします。面積は局所関数 iter の引数 s にセットします。あとは iter で区間 [0, 1] を n 個に分割して面積を求めます。

最初に x 座標を計算します。leftPoint は (float i) * w でいいのですが、midPoint は中間点を求めるため、変数 i を 1 から始めて、x 座標を次の式で求めます。

let x = ((float i) - 0.5) * w

たとえば、変数 i が 1 の場合は 0.5 になるので、x は区間 [0 * w, 1 * w] の中間点になります。あとは、4 / (1 + x * x) を計算して s に加算します。最後に s に w を掛け算して全体の面積を求めます。

実行結果を示します。

> test();;
----- 10 -----
3.1424259850010987, -8.3333141130559341e-004
3.2399259889071588, -9.8333335317365655e-002
----- 100 -----
3.1416009869231254, -8.3333333322777037e-006
3.1515759869231288, -9.9833333333356755e-003
----- 1000 -----
3.1415927369231227, -8.3333329570223214e-008
3.1425924869231245, -9.9983333333142355e-004
----- 10000 -----
3.1415926544231341, -8.3334095180020995e-010
3.1416926519231398, -9.9998333346729140e-005
----- 100000 -----
3.1415926535981615, -8.3684170704145799e-012
3.1416026535731527, -9.9999833595987297e-006
----- 1000000 -----
3.1415926535897643, 2.8865798640254070e-014
3.1415936535897928, -9.9999999969568876e-007
----- 10000000 -----
3.1415926535897309, 6.2172489379008766e-014
3.1415927535899870, -1.0000019390332682e-007
----- 100000000 -----
3.1415926535904264, -6.3327121324618929e-013
3.1415926635902252, -1.0000432038026474e-008
----- 1000000000 -----
3.1415926535899708, -1.7763568394002505e-013
3.1415926545896586, -9.9986552370978643e-010
val it: unit = ()

中点則の場合、分割数を 10 倍すると誤差はほぼ 1 / 100 になります。それに対し、1 の方法は分割数を 10 倍しても誤差は 1 / 10 にしかなりません。このように、1 の方法は分割数を増やさないと精度の高い値を求めることができません。

ただし、浮動小数点数 (float) の計算には誤差があるので、精度には限界があります。中点則の場合、分割数を 1000000 より増やしても精度は高くなりません。1 の方法は分割数を増やすと誤差は少なくなりますが、実行時間がかかるようになります。そこで、並列処理を使って実行時間を短縮してみましょう。

●数値積分の並列化

並列化の考え方は簡単です。たとえば、4 つの async 式で並列化するのであれば、区間を [0, 0.25], [0.25, 0.5], [0.5, 0.75], [0.75, 1] のように四等分して、それぞれの区間を 1 つの async 式で並列に計算します。あとは、その値の足し算すればいいわけです。

プログラムは次のようになります。

リスト : 数値積分で円周率を求める (並列化)

let N = 1000000000
let W = 1.0 / (float N)

let leftPoint n m =
  let rec iter i s =
    if i >= m then s * W
    else
      let x = (float i) * W
      iter (i + 1) (s + 4.0 / (1.0 + x * x))
  iter n 0.0

// 二分割
let xs = [
  async { return leftPoint 0 (N / 2) }
  async { return leftPoint (N / 2) N } ] |> Async.Parallel

// 四分割
let ys = [
  async { return leftPoint 0 (N / 4) }
  async { return leftPoint (N / 4) (N / 2) }
  async { return leftPoint (N / 2) (N / 4 * 3) }
  async { return leftPoint (N / 4 * 3) N }] |> Async.Parallel

定数 N が分割数で、W が小区間の幅を表します。関数 leftPoint の引数 n, m は区間を表します。あとは、区間を分割して async 式で leftPoint を呼び出すだけです。

それでは実行結果を示します。

> #time;;

--> 今すぐタイミング オン

> leftPoint 0 N;;
リアル: 00:00:01.475、CPU: 00:00:01.510、GC 全般0: 0, 全般1: 0, 全般2: 0
val it: float = 3.141592655

> xs |> Async.RunSynchronously;;
リアル: 00:00:01.060、CPU: 00:00:02.150、GC 全般0: 0, 全般1: 0, 全般2: 0
val it: float[] = [|1.854590436; 1.287002218|]

> it |> Array.sum;;
... 略 ...
val it: float = 3.141592655

> ys |> Async.RunSynchronously;;
リアル: 00:00:00.765、CPU: 00:00:03.020、GC 全般0: 0, 全般1: 0, 全般2: 0
val it: float[] = [|0.9799146526; 0.8746757838; 0.7194139995; 0.5675882187|]

> it |> Array.sum;;
... 略 ...
val it: float = 3.141592655

4 分割で約 1.93 倍の高速化に成功しました。並列化の効果は十分に出ていると思います。

●タスク式 (task 式)

.NET (C#) で非同期処理をプログラムする場合、以前はスレッド (Thread) を使いましたが、現在ではタスク (Task) と async / await を使う方法が主流になってきたようです。F# でもタスクを扱うためのコンピュテーション式「タスク式」が用意されています。

タスク式の構文は async 式とよく似ています。

task { expression; ... }

task 式を評価すると、expression を別スレッドで実行し、すぐに Task<'T> のインスタンスを返します。'T はコンピュテーション式のキーワード retrun によって返される値の型です。キーワード return! もありますが、async 式とは違って、末尾再帰は最適化されないことに注意してください。

task の中ではキーワード let! と do! を使って他のタスクを実行することができます。

let! 変数名 = task
do! task

let! は右辺 task (Task<'T>) を実行し、それが終了するまで待機します。終了したら左辺の変数にタスクの返り値 'T をセットします。do! は task (Task<unit>) を実行し、それが終了するまで待機します。

タスク式の内側でタスクを呼び出すのは簡単なのですが、外側でタスクを操作する場合、Task<'T> を F# の Async<'T> に変換する必要があります。この処理を Async の関数 AwaitTask で行います。

Async.AwaitTask task => Async<'T>

引数のタスクが Task<'T> であれば、AwaitTask は Async<'T> を返します。.NET (C#) の場合、ジェネリックではない Task もあります。引数のタスクが Task であれば Async<unit> を返します。AwaitTask を使うと、async 式の中でタスクを呼び出すことができます。

簡単な例を示しましょう。関数 test と test1 を task 式で書き直します。

リスト : タスク式の例題 (sampletask.fsx)

open System.Threading.Tasks

let test n name = task {
  for i = 1 to n do
    printfn "%d: %s" i name
    do! Task.Delay(500)
}

let test1 n name = task {
  for i = 1 to n do
    printfn "%d: %s" i name
    do! Task.Delay(500)
  return n
}

クラス Task を使うときは、System.Threading.Tasks を open してください。Task.Delay は Async.Sleep のタスクバージョンです。それでは実際に試してみましょう。

> #load "sampletask.fsx";;
[読み込み中 /home/mhiroi/fsharp/sampletask.fsx]
namespace FSI_0002
  val test: n: int -> name: string -> System.Threading.Tasks.Task
  val test1: n: int -> name: string -> System.Threading.Tasks.Task

> open Sampletask;;
- test 5 "foo" |> Async.AwaitTask |> Async.RunSynchronously;;
1: foo
2: foo
3: foo
4: foo
5: foo
val it: unit = ()

> test 6 "bar" |> Async.AwaitTask |> Async.Start;;
1: bar
val it: unit = ()

> 2: bar
3: bar
4: bar
5: bar
6: bar

> ["foo"; "bar"; "baz"] |> List.map (fun x -> test 5 x |> Async.AwaitTask) 
|> Async.Parallel |> Async.RunSynchronously;;
1: foo
1: bar
1: baz
2: foo
2: bar
2: baz
33: foo
3: bar
: baz
4: baz
4: foo
4: bar
5: bar
5: foo
5: baz
val it: unit[] = [|(); (); ()|]

> ["foo"; "bar"; "baz"] |> List.map (fun x -> test1 5 x |> Async.AwaitTask)
|> Async.Parallel |> Async.RunSynchronously;;
1: foo
1: bar
1: baz
2: baz
2: foo
2: bar
3: bar
3: foo
3: baz
4: foo
4: bar
4: baz
5: 5: foo
baz
5: bar
val it: int[] = [|5; 5; 5|]

test, test1 は簡単なプログラムなので、task 式を使うメリットはありませんが、.NET (C#) のタスクを F# で利用するとき、task 式は役に立つと思います。

●排他制御

最初に説明しましたが、スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを共有メモリといいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、共有メモリのアクセス時に発生する「競合」が問題になります。このため、あるスレッドがデータにアクセスしている間、他のスレッドからアクセスできないように制限します。これを「排他制御」といいます。

具体的には、鍵を担当するオブジェクト (同期オブジェクト) を用意します。データにアクセスする場合、最初にオブジェクトに鍵をかけます。この操作を「ロック (lock)」といいます。逆に、鍵を外す操作を「アンロック (unlock)」といいます。

オブジェクトがすでにロックされている場合、それがアンロックされるまでスレッドは待機します。オブジェクトをロックしたスレッドは、データにアクセスしたあと、オブジェクトをアンロックします。そのあと、待機していたスレッドがオブジェクトをロックして、データにアクセスすることになります。

.NET には排他制御を行うためのクラスがいくつか用意されていますが、F# では関数 lock を使うと簡単です。

> lock;;
val it: ('a -> (unit -> 'b) -> 'b) when 'a: not struct

第 1 引数 'a は同期オブジェクトです。'a: not struct は「参照型の制約」を表します。'a は値型ではなく参照型でなければいけません。C# のリファレンス lock ステートメント では Object (F# では obj) のインスタンスを使っています。

第 2 引数には引数のない関数 (サンク, thunk) を渡します。オブジェクトをロックしたあと、渡されたサンクを評価します。サンクには複数のスレッドが重なり合ってはいけない処理を記述します。これを「クリティカルセクション」といいます。サンクの評価が終了すると、ロックは自動的に解除されます。

簡単な例を示しましょう。.NET の並列ループ Parallel.For を使います。Parallel には並列処理を行うための関数が用意されています。よく使用される関数に For と ForEach があります。

1. Parallel.For(start, end, fun var -> expr)
2. Parallel.ForEach(collection, fun var -> expr)

Parallel を使用するときは System.Threading.Tasks を open してください。1 は for 文を、2 は foreach 文を (可能であれば) 並列に処理するものです。詳しい説明は拙作のページ C# 超入門: 並行プログラミング編 Parallel による並列処理 をお読みください。

リスト : データの競合 (排他制御なし)

open System.Threading.Tasks

let test_bad n = 
  let mutable cnt = 0
  Parallel.For(0, n, fun _ -> cnt <- cnt + 1) |> ignore
  cnt

関数 test_bad はループの回数を cnt でカウントします。逐次処理で行うと、当然ですが cnt の値は引数 n と同じになりますが、Parallel.For で並列に行うと、cnt のアクセスで競合が発生するため、cnt の値は n と等しくなりません。実際に試してみましょう。

> open System.Threading.Tasks;;
> let test_bad n =
-   let mutable cnt = 0
-   Parallel.For(0, n, fun _ -> cnt <- cnt + 1) |> ignore
-   cnt;;
val test_bad: n: int -> int

> test_bad 1000000;;
val it: int = 830569

> test_bad 1000000;;
val it: int = 829091

> test_bad 1000000;;
val it: int = 934514

cnt <- cnt + 1 は 1 つの式ですが、低レベルの操作では次の 3 つの処理に分けられます。

1. cnt の値を読み込む
2. その値に 1 を加算する
3. 結果を cnt に書き込む

処理が分かれていると、途中で他のスレッドが実行されることがあるのです。たとえば、スレッド A が cnt の値を読み込み、その値が 1000 だとしましょう。そして、A が cnt の値を書き換える前にスレッド B が実行されたとします。すると、スレッド B が読み込む cnt の値は 1000 のままです。これではループの回数を正確に数えることはできませんね。

ちなみに、1, 2, 3 のような低レベルの操作は、その途中で他のスレッドが実行されることはなく、処理は最後まで実行されます。このような操作を「不可分操作」とか「アトミック操作 (atomic operation)」といいます。インクリメントやデクリメントのような操作は、1 命令で実行できる CPU もあるので、その命令を使えば不可分操作になります。

test_bad では、cnt <- cnt + 1 で競合が発生しているので、関数 lock で排他制御すると正常に動作します。次のリストを見てください。

リスト : lock による排他制御

let test1 n = 
  let mutable cnt = 0
  let lc = obj()
  Parallel.For(0, n, fun _ -> lock lc (fun () -> cnt <- cnt + 1)) |> ignore
  cnt

変数 lc に obj のインスタンスをセットします。これが lock で使用する同期オブジェクトになります。Parallel.For のラムダ式で lock を評価し、lock に渡すサンクの中で cnt <- cnt + 1 を評価します。これで cnt <- cnt + 1 を実行している間、他のスレッドが重なり合うことはありません。

それでは実際に試してみましょう。

> let test1 n =
-   let mutable cnt = 0
-   let lc = obj()
-   Parallel.For(0, n, fun _ -> lock lc (fun () -> cnt <- cnt + 1)) |> ignore
-   cnt;;
val test1: n: int -> int

> test1 1000000;;
val it: int = 1000000

> test1 1000000;;
val it: int = 1000000

正常に動作していますね。ところで、lock による排他制御はけっこう重たい処理なので、今回のように変数の値を書き換えるだけならば、.NET (C#) のクラス System.Threading.Interlocked を使うことができます。整数の加算、インクリメント、デクリメントは以下の関数で行うことができます。

Interlocked.Add(var, value) => 結果
Interlocked.Increment(var) => 結果
Interlocked.Decrement(var) => 結果

引数 var には参照型の変数を渡します。関数 Add は var.Value <- var.Value + value を不可分操作として実行し、その結果を返します。Increment は var.Value <- var.Value + 1 を、Decrement は var.Value <- var.Value - 1 を不可分操作として実行し、その結果を返します。test1 を Increment を使って書き直すと次のようになります。

リスト : クラス Interlocked による不可分操作

open System.Threading

let test2 n = 
  let cnt = ref 0
  Parallel.For(0, n, fun _ -> Interlocked.Increment(cnt) |> ignore) |> ignore
  cnt.Value
> open System.Threading;;
> let test2 n =
-   let cnt = ref 0
-   Parallel.For(0, n, fun _ -> Interlocked.Increment(cnt) |> ignore) |> ignore
-   cnt.Value;;
val test2: n: int -> int

> test2 1000000;;
val it: int = 1000000

> test2 1000000;;
val it: int = 1000000

lock よりも Increment のほうが処理が軽いので、実行時間はこちらのほうが速くなります。このほかにも Interlocked には便利な関数が用意されています。詳細は .NET (C#) リファレンス Interlocked クラス をお読みください。


Copyright (C) 2022 Makoto Hiroi
All rights reserved.

[ PrevPage | F# | NextPage ]