M.Hiroi's Home Page

Functional Programming

お気楽 OCaml プログラミング入門

[ PrevPage | OCaml | NextPage ]

遅延ストリーム (2)

遅延ストリームの続きです。今回は遅延ストリームを使った応用例として、素数や順列を生成するプログラムを作ってみましょう。

●高階関数

遅延ストリームは高階関数も定義することができます。次のリストを見てください。

リスト : 高階関数

(* マップ関数 *)
let rec stream_map proc s =
  if s = Nils then Nils
  else Cons (proc (stream_head s), lazy (stream_map proc (stream_tail s)))

(* フィルター *)
let rec stream_filter pred s =
  if s = Nils then Nils
  else
    let x = stream_head s in
    if pred x then Cons(x, lazy (stream_filter pred (stream_tail s)))
    else stream_filter pred (stream_tail s)

(* 畳み込み *)
let rec stream_fold_left proc a s =
  if s = Nils then a
  else stream_fold_left proc (proc a (stream_head s)) (stream_tail s)

let rec stream_fold_right proc a s =
  if s = Nils then a
  else proc (stream_head s) (stream_fold_right proc a (stream_tail s))

let rec stream_scan_left proc a s =
  Cons (a, lazy (if s = Nils then Nils
                 else stream_scan_left proc (proc (stream_head s) a) (stream_tail s)))

(* 巡回 *)
let rec stream_iter proc s =
  if s = Nils then ()
  else (proc (stream_head s); stream_iter proc (stream_tail s))

関数の型は次のようになります。

val stream_map : ('a -> 'b) -> 'a stream -> 'b stream = <fun>
val stream_filter : ('a -> bool) -> 'a stream -> 'a stream = <fun>
val stream_fold_left : ('a -> 'b -> 'a) -> 'a -> 'b stream -> 'a = <fun>
val stream_fold_right : ('a -> 'b -> 'b) -> 'b -> 'a stream -> 'b = <fun>
val stream_scan_left : ('a -> 'b -> 'b) -> 'b -> 'a stream -> 'b stream = <fun>
val stream_iter : ('a -> 'b) -> 'a stream -> unit = <fun>

stream_map と stream_filter は関数と遅延ストリームを受け取り、新しい遅延ストリームを生成して返します。stream_map は引数のストリームの要素に関数 proc を適用した結果を新しいストリームに格納して返します。stream_filter は述語 pred が真を返す要素だけを新しいストリームに格納して返します。

stream_fold_left と stream_fold_right は遅延ストリームに対して畳み込み処理を行います。stream_iter は遅延ストリームの要素に関数 proc を適用します。これらの関数に無限ストリームを渡すと、処理が終了しない (無限ループになる) ので注意してください。stream_scan_left は遅延ストリームの先頭から畳み込みを行い、計算途中の累積値を格納した遅延ストリームを返します。stream_fold_left と違って、stream_scan_left は無限ストリームでも動作します。

簡単な実行例を示しましょう。

# let s1 = range 1 100;;
val s1 : int stream = Cons (1, <lazy>)

# let s2 = stream_map (fun x -> x * x) s1;;
val s2 : int stream = Cons (1, <lazy>)

# list_of_stream (stream_take s2 10);;
- : int list = [1; 4; 9; 16; 25; 36; 49; 64; 81; 100]

# let s3 = stream_filter (fun x -> x mod 2 = 0) s1;;
val s3 : int stream = Cons (2, <lazy>)

# list_of_stream (stream_take s3 10);;
- : int list = [2; 4; 6; 8; 10; 12; 14; 16; 18; 20]

# stream_fold_left (+) 0 s1;;
- : int = 5050
# stream_fold_right (+) 0 s1;;
- : int = 5050

# list_of_stream (stream_scan_left (+) 0 (range 1 10));;
- : int list = [0; 1; 3; 6; 10; 15; 21; 28; 36; 45; 55]

# stream_iter (fun x -> print_int x; print_newline()) (stream_take s1 10);;
1
2
3
4
5
6
7
8
9
10
- : unit = ()

変数 s1 に 1 から始まる整数列を生成するストリームをセットします。次に、s1 の要素を 2 乗するストリームを stream_map で生成して変数 s2 にセットします。stream_take で s2 から要素を 10 個取り出すと、s1 の要素を 2 乗した値になります。

s1 から偶数列のストリームを得るには、引数が偶数のときに真を返す述語を stream_filter に渡します。その返り値を変数 s3 にセットして、stream_take で 10 個の要素を取り出すと、リストの要素は 2 から 20 までの値になります。

s1 は有限個の遅延ストリームなので畳み込みを行うことができます。stream_fold_left と stream_fold_right で要素の合計値を求めると 5050 になります。stream_iter で有限ストリームの要素を出力することもできます。

●マップ関数の便利な使い方

2 つのストリームを受け取るマップ関数 stream_map2 も簡単です。プログラムは次のようになります。

リスト : マップ関数 (2)

let rec stream_map2 proc s1 s2 =
  if s1 = Nils || s2 = Nils then Nils
  else Cons (proc (stream_head s1) (stream_head s2),
             lazy (stream_map2 proc (stream_tail s1) (stream_tail s2)))

ストリーム s1 と s2 から要素を取り出し、それを関数 porc に適用します。その結果を新しいストリームに格納します。stream_map2 の型は次のようになります。

val stream_map2 : ('a -> 'b -> 'c) -> 'a stream -> 'b stream -> 'c stream = <fun>

stream_map2 を使うと、ストリームに対していろいろな処理を定義することができます。次の例を見てください。

# let add_stream s1 s2 = stream_map2 (+) s1 s2;;
val add_stream : int stream -> int stream -> int stream = <fun>
# let s1 = range 1 4;;
val s1 : int stream = Cons (1, <lazy>)
# let s2 = range 11 14;;
val s2 : int stream = Cons (11, <lazy>)
# let s5 = add_stream s1 s2;;
val s5 : int stream = Cons (12, <lazy>)
# stream_take s5 4;;
- : int list = [12; 14; 16; 18]

add_stream は s1 と s2 の要素を加算したストリームを返します。この add_stream を使うと、整数を生成するストリームは次のように定義することができます。

# let rec ones = Cons(1, lazy ones);;
val ones : int stream = Cons (1, <lazy>)
# let rec ints = Cons(1, lazy (add_stream ones ints));;
val ints : int stream = Cons (1, <lazy>)
# list_of_stream (stream_take ints 10);;
- : int list = [1; 2; 3; 4; 5; 6; 7; 8; 9; 10]

ストリーム ints は、現在の ints に 1 を足し算することで整数を生成しています。これで整数が生成できるとは不思議ですね。ints の動作を図に示すと、次のようになります。

let rec ones = Cons(1, lazy ones)
             = Cons(1, lazy_obj1)

let rec ints = Cons(1, lazy (add_stream ones ints))
             = Cons(1, lazy_obj2)

lazy_obj2 => Cons(1, lazy_obj1), Cons(1, lazy_obj2) ->
             Cons(1+1, lazy (add_stream (force lazy_obj1) (force lazy_obj2)))
          => Cons(2, lazy (add_stream (force lazy_obj1) (force lazy_obj2)))
          => Cons(2, lazy_obj3)

lazy_obj3 => Cons(1, lazy_obj1), Cons(2, lazy_obj3) -> ...
          => Cons(3, lazy (add_stream (force lazy_obj1) (force lazy_obj3)))
          => Cons(3, lazy_obj4)


        図 : ストリーム ints の動作

ones を Cons(1, lazy_obj1) と表し、ints を Cons(1, lazy_obj2) と表します。lazy_obj は遅延オブジェクトを表します。ints で次の要素を生成するとき、lazy_obj2 を force します。すると、add_stream (stream_map2) に ones と ints が適用され、ストリームの要素 2 と遅延オブジェクト lazy_obj3 が生成されます。このとき、lazy_obj3 の内容は add_stream (force lazy_obj1) (force lazy_obj2) になります。

次の要素を生成するときは、lazy_obj3 を force します。lazy_obj1 は Cons(1, lazy_obj1) に、lazy_obj2 は Cons(2, lazy_obj3) に評価されるので、ストリームの要素は 1 + 2 = 3 になり、遅延オブジェクト lazy_obj4 の内容は add_stream (force lazy_obj1) (force lazy_obj3) になります。そして、この遅延オブジェクトを force することで次の要素を求めることができます。

このように、遅延オブジェクトの中に現時点の整数を保持しておき、そこに 1 を足し算することで整数列を生成しているわけです。ここで、遅延オブジェクトは評価結果をキャッシュしているので、整数 n の次の値を簡単に計算できることに注意してください。もしも、遅延オブジェクトを単純なクロージャで実装した場合、整数 n を求めるため再計算が行われるので、効率はとても悪くなります。

同様の方法でフィボナッチ数列を生成するストリームを定義することができます。

リスト : フィボナッチ数列の生成

let rec fibs = Cons(0, lazy (Cons (1, lazy (add_stream (stream_tail fibs) fibs))))

fibs が現在のフィボナッチ数列を表していて、stream_tail fibs で次の要素を求めます。そして、それらを足し算することで、その次の要素を求めています。この場合、ストリームの初期値として 2 つの要素が必要になることに注意してください。

それでは実行してみましょう。

# stream_take fibs 10;;
- : int list = [0; 1; 1; 2; 3; 5; 8; 13; 21; 34]

このように、2 つのストリームを使ってフィボナッチ数列を生成することができます。

●stream_flatmap

次は高階関数 stream_flatmap を作りましょう。flatmap は map の結果を平坦化する関数で、具体的には map が返すリストの要素を append で連結する動作になります。引数がリストの場合、次のように定義することができます。

リスト : マッピングした結果を平坦化する

let rec flatmap f = function
    [] -> []
  | x::xs -> f x @ flatmap f xs
val flatmap : ('a -> 'b list) -> 'a list -> 'b list = <fun>

簡単な実行例を示します。

# flatmap (fun x -> [x; x; x]) [1; 2; 3; 4; 5];;
- : int list = [1; 1; 1; 2; 2; 2; 3; 3; 3; 4; 4; 4; 5; 5; 5]

stream_flatmap を定義する場合、次のように stream_append を使うと問題が発生します。

リスト : stream_flatmap の定義 (間違い)

let rec stream_flatmap proc s =
  if s = Nils then Nils
  else stream_append
         (proc (stream_head s))
         (stream_flatmap proc (stream_tail s))

OCaml の関数は正格評価なので、stream_append を実行する前に引数が評価されます。つまり、stream_flatmap の評価は遅延されるのではなく、遅延ストリームが空になるまで stream_flatmap が再帰呼び出しされるのです。これでは無限ストリームに対応することができません。

そこで、引数を遅延評価する関数 stream_append_delay を作ることにします。プログラムは次のようになります。

リスト : stream_append_delay と stream_flatmap

(* 遅延ストリームの連結 (遅延評価版) *)
let rec stream_append_delay s1 s2 =
  if s1 = Nils then force s2
  else Cons (stream_head s1, lazy (stream_append_delay (stream_tail s1) s2))

(* マッピングの結果を平坦化する *)
let rec stream_flatmap proc s =
  if s = Nils then Nils
  else stream_append_delay
         (proc (stream_head s))
         (lazy (stream_flatmap proc (stream_tail s)))

stream_append_delay は stream_append とほぼ同じですが、遅延ストリーム s1 が空になったら遅延オブジェクト s2 を force で評価するところが異なります。stream_flatmap では、stream_appned のかわりに stream_append_delay を使います。このとき、lazy で生成した遅延オブジェクトを引数に渡します。

それでは実行してみましょう。

# let s1 = stream_unfold succ 1;;
val s1 : int stream = Cons (1, <lazy>)
# let s2 = stream_flatmap (fun x -> range 1 x) s1;;
val s2 : int stream = Cons (1, <lazy>)
# list_of_stream (stream_take s2 55);;
- : int list =
[1; 1; 2; 1; 2; 3; 1; 2; 3; 4; 1; 2; 3; 4; 5; 1; 2; 3; 4; 5; 6; 1; 2; 3; 4;
 5; 6; 7; 1; 2; 3; 4; 5; 6; 7; 8; 1; 2; 3; 4; 5; 6; 7; 8; 9; 1; 2; 3; 4; 5;
 6; 7; 8; 9; 10]

s1 は無限ストリームになりますが、stream_flatmap は正常に動作していますね。

●stream_take_while と stream_drop_while

次は、遅延ストリームの先頭から述語が真を返す要素を取り出す stream_take_while と要素を取り除く stream_drop_while を作ります。

リスト : stream_take_while と stream_drop_while

(* 述語 pred が真を返す要素を取り出す *)
let rec stream_take_while pred s =
  if s = Nils || not (pred (stream_head s)) then Nils
  else Cons (stream_head s, lazy (stream_take_while pred (stream_tail s)))

(* 述語 pred が真を返す要素を取り除く *)
let rec stream_drop_while pred s =
  if s = Nils || not (pred (stream_head s)) then s
  else stream_drop_while pred (stream_tail s)

どちらの関数も難しいところはないと思います。簡単な実行例を示しましょう。

# let s1 = stream_unfold succ 1;;
val s1 : int stream = Cons (1, <lazy>)
# list_of_stream (stream_take_while (fun x -> x < 11) s1);;
- : int list = [1; 2; 3; 4; 5; 6; 7; 8; 9; 10]
# let s2 = stream_drop_while (fun x -> x < 11) s1;;
val s2 : int stream = Cons (11, <lazy>)
# list_of_stream (stream_take s2 10);;
- : int list = [11; 12; 13; 14; 15; 16; 17; 18; 19; 20]

●組 (pair) を生成する遅延ストリーム

次は、2 つのストリームからその要素の組み合わせを生成するストリームを作りましょう。要素が n 個のストリームの場合、組み合わせは n * n 個あります。次の図を見てください。

(a0, b0) (a0, b1) (a0, b2) ... (a0, bn)
(a1, b0) (a1, b1) (a1, b2) ... (a1, bn)
(a2, b0) (a2, b1) (a2, b2) ... (a2, bn)

                           ...

(an, b0) (an, b1) (an, b2) ... (an, bn)


        図 : n * n 個の組

これは「直積集合」を求めることと同じです。遅延ストリームが有限であれば、stream_flatmap と stream_map を使って簡単にプログラムできます。

リスト : 組の生成 (1)

let rec pair_stream s1 s2 =
  stream_flatmap (fun x -> stream_map (fun y -> (x, y)) s2) s1

実行例を示します。

# list_of_stream (pair_stream (range 1 4) (range 5 8));;
- : (int * int) list =
[(1, 5); (1, 6); (1, 7); (1, 8); (2, 5); (2, 6); (2, 7); (2, 8); (3, 5);
 (3, 6); (3, 7); (3, 8); (4, 5); (4, 6); (4, 7); (4, 8)]

ところが、この方法では無限ストリームに対応できません。実際、s2 に無限ストリームを渡した場合、s1 の最初の要素を a0 とすると (a0, s2の要素) という組しか生成されません。実際に試してみましょう。

# list_of_stream (stream_take (pair_stream (range 1 4) (stream_unfold succ 1)) 16);;
- : (int * int) list =
[(1, 1); (1, 2); (1, 3); (1, 4); (1, 5); (1, 6); (1, 7); (1, 8); (1, 9);
 (1, 10); (1, 11); (1, 12); (1, 13); (1, 14); (1, 15); (1, 16)]

そこで、下図に示すように、対角線上に組を生成していくことにします。

   | a0  a1  a2  a3  a4  a5
---+-----------------------------
b0 | 0   1   3   6   10  15  ...
   |
b1 | 2   4   7   11  16  ...
   |
b2 | 5   8   12  17  ...
   |
b3 | 9   13  18  ...
   |
b4 | 14  19  ...
   |
b5 | 20 ...
   |
   | ...
   |


図 : 無限ストリームによる組の生成

図を見ればおわかりのように、対角線の要素数を n とすると、組は (an-1 b0), (an-2 b1), ..., (a1 bn-2), (a0 bn-1) となっています。これは、s1 から n 個の要素を取り出したリストと、s2 から n 個の要素を取り出して反転したリストを zip でまとめた形になっています。プログラムは次のようになります。

リスト : 組の生成 (2)

let rec pair_stream1 ?(n = 1) s1 s2 =
  let ys = stream_of_list (List.rev (list_of_stream (stream_take s2 n))) in
  stream_append_delay
    (stream_map2 (fun x y -> (x, y)) (stream_take s1 n) ys)
    (lazy (pair_stream1 ~n:(n + 1) s1 s2))

関数 pair_stream1 の引数 n が対角線上の要素数を表します。s2 から取り出したリストを List.rev で反転し、それをストリームに変換して変数 ys にセットします。s1 と ys の要素の組は stream_map2 を使えば簡単に生成できます。あとは、stream_appned_delay で stream_map と pair_stream1 を連結すればいいわけです。これで無限ストリームに対応することができます。

それでは実行してみましょう。

# let ps = pair_stream1 (stream_unfold succ 1) (stream_unfold succ 1);;
val ps : (int * int) stream = Cons ((1, 1), <lazy>)
# list_of_stream (stream_take ps 55);;
- : (int * int) list =
[(1, 1); (1, 2); (2, 1); (1, 3); (2, 2); (3, 1); (1, 4); (2, 3); (3, 2);
 (4, 1); (1, 5); (2, 4); (3, 3); (4, 2); (5, 1); (1, 6); (2, 5); (3, 4);
 (4, 3); (5, 2); (6, 1); (1, 7); (2, 6); (3, 5); (4, 4); (5, 3); (6, 2);
 (7, 1); (1, 8); (2, 7); (3, 6); (4, 5); (5, 4); (6, 3); (7, 2); (8, 1);
 (1, 9); (2, 8); (3, 7); (4, 6); (5, 5); (6, 4); (7, 3); (8, 2); (9, 1);
 (1, 10); (2, 9); (3, 8); (4, 7); (5, 6); (6, 5); (7, 4); (8, 3); (9, 2);
 (10, 1)]
# stream_ref ps 10;;
- : int * int = (1, 5)
# stream_ref ps 54;;
- : int * int = (10, 1)
# stream_ref ps 100;;
- : int * int = (10, 5)

正常に動作していますね。

●集合演算

ここで、遅延ストリームには重複要素が存在せず、要素は昇順に出力されることを前提にすると、遅延ストリームでも集合演算を行うことができます。次のリストを見てください。

リスト : 集合演算

(* 和集合 *)
let rec stream_union s1 s2 =
  if s1 = Nils then s2
  else if s2 = Nils then s2
  else
    let x = stream_head s1 in
    let y = stream_head s2 in
    if x = y then Cons (x, lazy (stream_union (stream_tail s1) (stream_tail s2)))
    else if x < y then Cons (x, lazy (stream_union (stream_tail s1) s2))
    else Cons (y, lazy (stream_union s1 (stream_tail s2)))

(* 積集合 *)
let rec stream_intersect s1 s2 =
  if s1 = Nils || s2 = Nils then Nils
  else
    let x = stream_head s1 in
    let y = stream_head s2 in
    if x = y then Cons (x, lazy (stream_intersect (stream_tail s1) (stream_tail s2)))
    else if x < y then stream_intersect (stream_tail s1) s2
    else stream_intersect s1 (stream_tail s2)
val stream_union : 'a stream -> 'a stream -> 'a stream = <fun>
val stream_intersect : 'a stream -> 'a stream -> 'a stream = <fun>

stream_union は s1 と s2 から要素を取り出して、小さいほうを遅延ストリームに追加します。等しい場合は要素をひとつだけ追加します。このとき、s1 と s2 の両方から先頭要素を取り除くことに注意してください。

stream_intersect も簡単です。s1, s2 の先頭要素を比較して、等しい場合はその要素を遅延ストリームに追加します。s1 の要素が s2 の要素よりも小さい場合は、s1 を一つ進めて次の要素を調べます。s2 の要素が小さい場合は s2 の次の要素を調べます。

簡単な実行例を示しましょう。

# let s1 = stream_scan_left (+) 1 (stream_unfold succ 2);;
val s1 : int stream = Cons (1, <lazy>)

# list_of_stream (stream_take s1 10);;
- : int list = [1; 3; 6; 10; 15; 21; 28; 36; 45; 55]

# let s2 = stream_map (fun x -> x * x) (stream_unfold succ 1);;
val s2 : int stream = Cons (1, <lazy>)

# list_of_stream (stream_take s2 10);;
- : int list = [1; 4; 9; 16; 25; 36; 49; 64; 81; 100]

# list_of_stream (stream_take (stream_union s1 s2) 20);;
- : int list =
[1; 3; 4; 6; 9; 10; 15; 16; 21; 25; 28; 36; 45; 49; 55; 64; 66; 78; 81; 91]

# list_of_stream (stream_take (stream_intersect s1 s2) 7);;
- : int list = [1; 36; 1225; 41616; 1413721; 48024900; 1631432881]

遅延ストリーム s1 は「三角数」、s2 は「四角数」を表します。これらの遅延ストリームを stream_union でまとめると、三角数または四角数の数列になります。stream_intersect でまとめると、三角数かつ四角数の数列 (平方三角数) になります。平方三角数は拙作のページ Puzzle DE Progamming 多角数 でも取り上げています。興味のある方はお読みくださいませ。

●ハミングの問題

ここで stream_union を使うと簡単に解ける問題を紹介しましょう。

[ハミングの問題]

7 以上の素数で割り切れない正の整数を小さい順に N 個求めよ

参考文献 : 奥村晴彦,『C言語による最新アルゴリズム事典』, 技術評論社, 1991 (361 ページより引用)

7 以上の素数で割り切れない正の整数は、素因子が 2, 3, 5 しかない自然数のことで、これを「ハミング数 (Hamming Numbers)」といいます。ハミング数は素因数分解したとき、2i * 3j * 5k (i, j, k >= 0) の形式になります。たとえば、100 以下のハミング数は次のようになります。

1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 27, 30, 32, 36, 40, 45, 48, 50, 
54, 60, 64, 72, 75, 80, 81, 90, 96, 100

遅延ストリームを使うと「ハミングの問題」は簡単に解くことができます。小さい順にハミング数を出力する遅延ストリームを hs としましょう。hs は 1 から始まるので次のように定義できます。

(let rec hs = Cons(1, lazy (...))

最初の要素は 1 なので、それに 2, 3, 5 を掛け算した値 (2, 3, 5) もハミング数になります。この値は次の式で生成することができます。

stream_map (fun x -> x * 2) hs)
stream_map (fun x -> x * 3) hs)
stream_map (fun x -> x * 5) hs)

あとは、これらの遅延ストリームを stream_union でひとつにまとめて、小さい順に出力すればいいわけです。

プログラムと実行結果を示します。

リスト : ハミングの問題

let rec hs = Cons (1, lazy (stream_union
                              (stream_map (fun x -> x * 2) hs)
                              (stream_union
                                 (stream_map (fun x -> x * 3) hs)
                                 (stream_map (fun x -> x * 5) hs))))
# list_of_stream (stream_take hs 100);;
- : int list =
[1; 2; 3; 4; 5; 6; 8; 9; 10; 12; 15; 16; 18; 20; 24; 25; 27; 30; 32; 36; 40;
 45; 48; 50; 54; 60; 64; 72; 75; 80; 81; 90; 96; 100; 108; 120; 125; 128;
 135; 144; 150; 160; 162; 180; 192; 200; 216; 225; 240; 243; 250; 256; 270;
 288; 300; 320; 324; 360; 375; 384; 400; 405; 432; 450; 480; 486; 500; 512;
 540; 576; 600; 625; 640; 648; 675; 720; 729; 750; 768; 800; 810; 864; 900;
 960; 972; 1000; 1024; 1080; 1125; 1152; 1200; 1215; 1250; 1280; 1296; 1350;
 1440; 1458; 1500; 1536]

●順列の生成

次は遅延ストリームを使って順列を生成するプログラムを作ってみましょう。遅延ストリームを使う場合、再帰呼び出しの一番深いところで順列が完成するようにプログラムするとうまくいきません。要素が n 個の順列を生成する場合、n - 1 個の順列を生成するストリームを生成し、そこに要素を一つ加えて n 個の順列を生成すると考えます。

この考え方は、拙作のページ 順列と組み合わせ の問題 4 で作成した、順列をリストに格納して返すプログラムと同じです。このプログラムを遅延ストリームに対応させると次のようになります。

リスト : 遅延ストリームによる順列の生成

let rec permutation n s =
  if n = 0 then Cons([], lazy Nils)
  else
    stream_flatmap
      (fun x -> stream_map (fun y -> x::y)
                           (permutation (n - 1) (stream_filter (fun z -> z <> x) s)))
      s
val permutation : int -> 'a stream -> 'a list stream = <fun>

関数 permutation はストリーム s の中から要素を n 個選ぶ順列を生成します。n = 0 の場合は空リストを格納したストリームを返します。あとは、stream_flatmap の匿名関数の中で、permutation を再帰呼び出しして n - 1 個を選ぶ順列を生成します。ストリーム s から要素 x を取り除くため、stream_filter を使っています。これで順列を生成するストリームを作ることができます。

それでは実際に試してみましょう。

# let s1 = permutation 4 (range 1 4);;
val s1 : int list stream = Cons ([1; 2; 3; 4], <lazy>)
# list_of_stream s1;;
- : int list list =
[[1; 2; 3; 4]; [1; 2; 4; 3]; [1; 3; 2; 4]; [1; 3; 4; 2]; [1; 4; 2; 3];
 [1; 4; 3; 2]; [2; 1; 3; 4]; [2; 1; 4; 3]; [2; 3; 1; 4]; [2; 3; 4; 1];
 [2; 4; 1; 3]; [2; 4; 3; 1]; [3; 1; 2; 4]; [3; 1; 4; 2]; [3; 2; 1; 4];
 [3; 2; 4; 1]; [3; 4; 1; 2]; [3; 4; 2; 1]; [4; 1; 2; 3]; [4; 1; 3; 2];
 [4; 2; 1; 3]; [4; 2; 3; 1]; [4; 3; 1; 2]; [4; 3; 2; 1]]

24 通りの順列をすべて求めることができました。

●8クイーンの解法

同様に、遅延ストリームを使って 8 クイーンを解くことができます。

リスト : 8 クイーンの解法 (遅延ストリーム版)

let attack x xs =
  let rec attack_sub x n = function
      [] -> true
    | y :: ys -> if x = y + n || x = y - n then false
                 else attack_sub x (n + 1) ys
  in
    attack_sub x 1 xs

let rec queen s =
  if s = Nils then Cons([], lazy Nils)
  else
    stream_filter
      (fun ls -> match ls with
                   [] -> true
                 | x::xs -> attack x xs)
      (stream_flatmap
        (fun x -> stream_map (fun y -> x::y)
                             (queen (stream_filter (fun z -> z <> x) s)))
        s)
val attack : int -> int list -> bool = <fun>
val queen : int stream -> int list stream = <fun>

関数 queen は permutation とほぼ同じですが、stream_filter を使って追加したクイーンが他のクイーンと衝突しているものを取り除いています。衝突をチェックする関数 attack は バックトラック法 の 8 クイーンで作成したものと同じです。

それでは実行してみましょう。

# list_of_stream (stream_take (queen (range 1 8)) 10);;
- : int list list =
[[1; 5; 8; 6; 3; 7; 2; 4]; [1; 6; 8; 3; 7; 4; 2; 5];
 [1; 7; 4; 6; 8; 2; 5; 3]; [1; 7; 5; 8; 2; 4; 6; 3];
 [2; 4; 6; 8; 3; 1; 7; 5]; [2; 5; 7; 1; 3; 8; 6; 4];
 [2; 5; 7; 4; 1; 8; 6; 3]; [2; 6; 1; 7; 4; 8; 3; 5];
 [2; 6; 8; 3; 1; 4; 7; 5]; [2; 7; 3; 6; 8; 5; 1; 4]]

解の総数は全部で 92 通りあります。遅延ストリームを使うと、必要な分だけの計算しか行われないので効率的です。

●素数の生成

最後に簡単な例題として、ストリームを使って素数を求めるプログラムを作ってみましょう。

考え方は簡単です。最初に、2 から始まる整数列を生成するストリームを用意します。2 は素数なので、素数ストリームの要素になります。次に、この整数列から 2 で割り切れる整数を取り除き除きます。これは stream_filter を使うと簡単です。

2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これも stream_filter を使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くように stream_filter を設定すればいいわけです。

このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番に stream_fiter で設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。

リスト : 素数の生成

let rec sieve s =
  if s = Nils then raise Empty_stream
  else
    let x = stream_head s in
    Cons (x, lazy (sieve (stream_filter (fun a -> a mod x <> 0) (stream_tail s))))
val sieve : int stream -> int stream = <fun>

sieve の引数 s には 2 から始まる整数列を生成するストリームを渡します。s に stream_tail を適用し、さらに stream_filter を適用すると、整数列から 2 で割り切れる整数を取り除いたストリームが返されます。次の要素 3 を取り出すとき、このストリームに対して 3 で割り切れる整数を取り除くことになるので、2 と 3 で割り切れる整数が取り除かれることになります。次の要素は 5 になりますが、そのストリームからさらに 5 で割り切れる整数が stream_filter で取り除かれることになります。

このように stream_filter が設定されていくことで、素数でない整数をふるい落としていくことができるわけです。それでは実行してみましょう。

# let ps = sieve (stream_unfold succ 2);;
val ps : int stream = Cons (2, <lazy>)
# list_of_stream (stream_take ps 25);;
- : int list =
[2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
 73; 79; 83; 89; 97]
# list_of_stream (stream_take ps 100);;
- : int list =
[2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
 73; 79; 83; 89; 97; 101; 103; 107; 109; 113; 127; 131; 137; 139; 149; 151;
 157; 163; 167; 173; 179; 181; 191; 193; 197; 199; 211; 223; 227; 229; 233;
 239; 241; 251; 257; 263; 269; 271; 277; 281; 283; 293; 307; 311; 313; 317;
 331; 337; 347; 349; 353; 359; 367; 373; 379; 383; 389; 397; 401; 409; 419;
 421; 431; 433; 439; 443; 449; 457; 461; 463; 467; 479; 487; 491; 499; 503;
 509; 521; 523; 541]

●より高速な方法

関数 sieve は簡単にプログラムできますが、生成する素数の個数が多くなると、その実行速度はかなり遅くなります。実をいうと、sieve なみに簡単で sieve よりも高速な方法があります。

整数 n が素数か確かめる簡単な方法は、√n 以下の素数で割り切れるか試してみることです。割り切れる素数 m があれば、n は素数ではありません。そうでなければ、n は素数であることがわかります。

これをそのままプログラムすると次のようになります。

リスト : 素数列の生成

let rec primes_from n =
  if primep n then Cons (n, lazy (primes_from (n + 2)))
  else primes_from (n + 2)
and primep n =
  let rec _primep s =
    let p = stream_head s in
    if p * p > n then true
    else if n mod p = 0 then false
    else _primep (stream_tail s)
  in
  _primep (stream_tail primes)
and primes = Cons (2, lazy (Cons (3, lazy (Cons (5, lazy (primes_from 7))))))

変数 primes は無限の素数列を表します。実際に素数を生成する処理は関数 primes_from で行います。primes_from は述語 primep を呼び出して n が素数かチェックします。そうであれば、Cons で n を遅延ストリームに追加します。そうでなければ primes_from を再帰呼び出しするだけです。偶数は素数ではないので、引数 n には奇数を与えていることに注意してください。primep は primes をたどって n が素数か判定するだけです。

それでは実行してみましょう。

# list_of_stream (stream_take primes 25);;
- : int list =
[2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
 73; 79; 83; 89; 97]
# stream_ref primes 99;;
- : int = 541
# stream_ref primes 500;;
- : int = 3581

100 以下の素数は全部で 25 個あります。また、100 番目の素数は 541 になります。OCaml のリストは 0 から数えるので、(stream-ref primes 99) で 100 番目の素数になります。実行時間ですが、stream_ref で 5000 番目の素数を求めたところ、sieve では約 3 秒かかりましたが、primes で一瞬 (計測不能) で求めることができました。興味のある方はいろいろ試してみてください。

●双子素数

差が 2 である素数の組を「双子素数 (twin prime)」といいます。素数列 primes を使うと双子素数は簡単に求めることができます。

# let twin = stream_filter
               (fun (x, y) -> y - x = 2) 
               (stream_map2 (fun x y -> (x, y)) primes (stream_tail primes));;
val twin : (int * int) stream = Cons ((3, 5), <lazy>)
# list_of_stream (stream_take twin 50);;
- : (int * int) list =
[(3, 5); (5, 7); (11, 13); (17, 19); (29, 31); (41, 43); (59, 61); (71, 73);
 (101, 103); (107, 109); (137, 139); (149, 151); (179, 181); (191, 193);
 (197, 199); (227, 229); (239, 241); (269, 271); (281, 283); (311, 313);
 (347, 349); (419, 421); (431, 433); (461, 463); (521, 523); (569, 571);
 (599, 601); (617, 619); (641, 643); (659, 661); (809, 811); (821, 823);
 (827, 829); (857, 859); (881, 883); (1019, 1021); (1031, 1033);
 (1049, 1051); (1061, 1063); (1091, 1093); (1151, 1153); (1229, 1231);
 (1277, 1279); (1289, 1291); (1301, 1303); (1319, 1321); (1427, 1429);
 (1451, 1453); (1481, 1483); (1487, 1489)]

ところで、双子素数 - Wikipedia によると、『双子素数は無数に存在するかという問題、いわゆる「双子素数の予想」や「双子素数の問題」は、いまだに数学上の未解決問題である。無数に存在するだろう、とは、多くの数論学者が予想している。』 とのことです。

●参考 URL

  1. 計算機プログラムの構造と解釈 第二版 (和田英一 訳), 3.5 ストリーム
  2. Gauche ユーザリファレンス: 6.19 遅延評価

●プログラムリスト

(*
 * lazystream.ml : 遅延ストリーム
 *
 *                 Copyright (C) 2008-2020 Makoto Hiroi
 *)

open Lazy

(* 例外 *)
exception Empty_stream

(* データ型 *)
type 'a stream = Nils | Cons of 'a * 'a stream lazy_t

(* ストリームの先頭要素を取り出す *)
let stream_head = function
  Nils -> raise Empty_stream
| Cons (x, _) -> x

(* ストリームの次の要素を求める *)
let stream_tail = function
  Nils -> raise Empty_stream
| Cons(_, tail) -> force tail

(* n 番目の要素を求める *)
let rec stream_ref s n =
  if s = Nils then raise Empty_stream
  else if n = 0 then stream_head s
  else stream_ref (stream_tail s) (n - 1)

(* n 個の要素を取り出して遅延ストリームに格納する *)
let rec stream_take s n =
  if s = Nils || n = 0 then Nils
  else Cons (stream_head s, lazy (stream_take (stream_tail s) (n - 1)))

(* n 個の要素を取り除く *)
let rec stream_drop s n =
  if s = Nils || n = 0 then s
  else stream_drop (stream_tail s) (n - 1)

(* ストリームの結合 *)
let rec stream_append s1 s2 =
  if s1 = Nils then s2
  else Cons (stream_head s1, lazy (stream_append (stream_tail s1) s2))

let rec interleave s1 s2 =
  if s1 = Nils then s2
  else Cons (stream_head s1, lazy (interleave s2 (stream_tail s1)))

let rec stream_append_delay s1 s2 =
  if s1 = Nils then force s2
  else Cons (stream_head s1, lazy (stream_append_delay (stream_tail s1) s2))

(* 高階関数 *)

(* マップ関数 *)
let rec stream_map proc s =
  if s = Nils then Nils
  else Cons (proc (stream_head s), lazy (stream_map proc (stream_tail s)))

let rec stream_map2 proc s1 s2 =
  if s1 = Nils || s2 = Nils then Nils
  else Cons (proc (stream_head s1) (stream_head s2),
             lazy (stream_map2 proc (stream_tail s1) (stream_tail s2)))

let add_stream s1 s2 = stream_map2 (+) s1 s2

let rec stream_flatmap proc s =
  if s = Nils then Nils
  else stream_append_delay
         (proc (stream_head s))
         (lazy (stream_flatmap proc (stream_tail s)))

(* フィルター *)
let rec stream_filter pred s =
  if s = Nils then Nils
  else
    let x = stream_head s in
    if pred x then Cons(x, lazy (stream_filter pred (stream_tail s)))
    else stream_filter pred (stream_tail s)

(* 畳み込み *)
let rec stream_fold_left proc a s =
  if s = Nils then a
  else stream_fold_left proc (proc a (stream_head s)) (stream_tail s)

let rec stream_fold_right proc a s =
  if s = Nils then a
  else proc (stream_head s) (stream_fold_right proc a (stream_tail s))

let rec stream_scan_left proc a s =
  Cons (a, lazy (if s = Nils then Nils
                 else stream_scan_left proc (proc (stream_head s) a) (stream_tail s)))

(* 巡回 *)
let rec stream_iter proc s =
  if s = Nils then ()
  else (proc (stream_head s); stream_iter proc (stream_tail s))

(* 述語 pred が真を返す要素を取り出す *)
let rec stream_take_while pred s =
  if s = Nils || not (pred (stream_head s)) then Nils
  else Cons (stream_head s, lazy (stream_take_while pred (stream_tail s)))

(* 述語 pred が真を返す要素を取り除く *)
let rec stream_drop_while pred s =
  if s = Nils || not (pred (stream_head s)) then s
  else stream_drop_while pred (stream_tail s)

(* ストリームの生成 *)

(* low から high までの整数列 *)
let rec range low high =
  if low > high then Nils
  else Cons (low, lazy (range (low + 1) high))

(* 無限ストリーム *)
let rec ones = Cons (1, lazy ones)
let rec ints = Cons (1, lazy (add_stream ones ints))
let rec integers x = Cons (x, lazy (integers (x + 1)))

(* フィボナッチ数列 *)
let rec fibonacci a b = Cons (a, lazy (fibonacci b (a + b)))
let rec fibs = Cons (1, lazy (Cons (1, lazy (add_stream (stream_tail fibs) fibs))))

(* 逆畳み込み *)
let rec stream_unfold ?(pred = fun _ -> false) iterate seed =
  if pred seed then Nils
  else Cons (seed, lazy (stream_unfold ~pred iterate (iterate seed)))

(* リストとの相互変換 *)
let rec list_of_stream s =
  if s = Nils then []
  else (stream_head s) :: list_of_stream (stream_tail s)

let rec stream_of_list = function
    [] -> Nils
  | x::xs -> Cons(x, lazy (stream_of_list xs))

(* 組の生成 *)
let rec pair_stream s1 s2 =
  stream_flatmap (fun x -> stream_map (fun y -> (x, y)) s2) s1

let rec pair_stream1 ?(n = 1) s1 s2 =
  let ys = stream_of_list (List.rev (list_of_stream (stream_take s2 n))) in
  stream_append_delay
    (stream_map2 (fun x y -> (x, y)) (stream_take s1 n) ys)
    (lazy (pair_stream1 ~n:(n + 1) s1 s2))

(* 集合演算 *)

(* 和集合 *)
let rec stream_union s1 s2 =
  if s1 = Nils then s2
  else if s2 = Nils then s2
  else
    let x = stream_head s1 in
    let y = stream_head s2 in
    if x = y then Cons (x, lazy (stream_union (stream_tail s1) (stream_tail s2)))
    else if x < y then Cons (x, lazy (stream_union (stream_tail s1) s2))
    else Cons (y, lazy (stream_union s1 (stream_tail s2)))

(* 積集合 *)
let rec stream_intersect s1 s2 =
  if s1 = Nils || s2 = Nils then Nils
  else
    let x = stream_head s1 in
    let y = stream_head s2 in
    if x = y then Cons (x, lazy (stream_intersect (stream_tail s1) (stream_tail s2)))
    else if x < y then stream_intersect (stream_tail s1) s2
    else stream_intersect s1 (stream_tail s2)

(* ハミングの問題 *)
let rec hs = Cons (1, lazy (stream_union
                              (stream_map (fun x -> x * 2) hs)
                              (stream_union
                                 (stream_map (fun x -> x * 3) hs)
                                 (stream_map (fun x -> x * 5) hs))))

(* 順列の生成 *)
let rec permutation n s =
  if n = 0 then Cons([], lazy Nils)
  else
    stream_flatmap
      (fun x -> stream_map (fun y -> x::y)
                           (permutation (n - 1) (stream_filter (fun z -> z <> x) s)))
      s

(***** 8 Queen *****)

(* 衝突のチェック *)
let attack x xs =
  let rec attack_sub x n = function
      [] -> true
    | y :: ys -> if x = y + n || x = y - n then false
                 else attack_sub x (n + 1) ys
  in
    attack_sub x 1 xs

(* 8 Queen の解法 *)
let rec queen s =
  if s = Nils then Cons([], lazy Nils)
  else
    stream_filter
      (fun ls -> match ls with
                   [] -> true
                 | x::xs -> attack x xs)
      (stream_flatmap
        (fun x -> stream_map (fun y -> x::y)
                             (queen (stream_filter (fun z -> z <> x) s)))
        s)

(* 素数の生成 *)
let rec sieve s =
  if s = Nils then raise Empty_stream
  else
    let x = stream_head s in
    Cons (x, lazy (sieve (stream_filter (fun a -> a mod x <> 0) (stream_tail s))))

(* 高速な方法 *)
let rec primes_from n =
  if primep n then Cons (n, lazy (primes_from (n + 2)))
  else primes_from (n + 2)
and primep n =
  let rec _primep s =
    let p = stream_head s in
    if p * p > n then true
    else if n mod p = 0 then false
    else _primep (stream_tail s)
  in
  _primep (stream_tail primes)
and primes = Cons (2, lazy (Cons (3, lazy (Cons (5, lazy (primes_from 7))))))

初版 2008 年 9 月 20 日
改訂 2020 年 8 月 2 日

Copyright (C) 2008-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | OCaml | NextPage ]