M.Hiroi's Home Page

お気楽 OCaml プログラミング入門

遅延ストリーム


Copyright (C) 2008-2020 Makoto Hiroi
All rights reserved.

はじめに

「ストリーム (stream)」はデータの流れを抽象化したデータ構造です。たとえば、ファイル入出力はストリームと考えることができます。リストを使ってストリームを表すこともできます。ただし、単純なリストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができるようになります。これを「遅延ストリーム」とか「遅延リスト」と呼びます。今回は遅延ストリームについて説明します。

●遅延ストリームの構造

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して値を求めればよいわけです。

遅延ストリームのデータ型は次のようになります。

リスト : 遅延ストリームのデータ型とアクセス関数

open Lazy;;

(* 例外 *)
exception Empty_stream

(* データ型 *)
type 'a stream = Nils | Cons of 'a * 'a stream lazy.t

(* ストリームの先頭要素を取り出す *)
let stream_head = function
  Nils -> raise Empty_stream
| Cons (x, _) -> x

(* ストリームの次の要素を求める *)
let stream_tail = function
  Nils -> raise Empty_stream
| Cons(_, tail) -> force tail

データ型は 'a stream としました。Nils はストリームの終端を表します。無限ストリームだけを扱うのであれば Nils は必要ありません。Cons が遅延ストリームの本体を表していて、組の最初の要素が現時点での先頭データを表し、次の要素が遅延ストリームを生成する関数を格納する遅延オブジェクト (lazy.t) です。

遅延ストリームの生成はコンストラクタ Cons を使うことにします。関数 stream_head は遅延ストリームから要素を取り出して返します。関数 stream_tail は遅延オブジェクトを force して、次の要素を格納した遅延ストリームを生成して返します。ようするに、これらの操作はリストのコンス演算子 :: と関数 hd と tl に対応するわけです。

●遅延ストリームの生成

それでは、遅延ストリームを生成する関数を作りましょう。たとえば、low から high までの整数列を生成するストリームは次のようにプログラムすることができます。

リスト : 整数列を生成するストリーム

let rec range low high =
  if low > high then Nils
  else Cons (low, lazy (range (low + 1) high))

関数 range は遅延ストリームを生成して返します。Cons の第 1 要素が現時点でのデータになります。第 2 要素の遅延オブジェクトを force すると、range (low + 1) high が実行されて次のデータを格納した遅延ストリームが返されます。そして、その中の遅延オブジェクトを force すると、その次のデータを得ることができます。

関数 range の型は次のようになります。

val range : int -> int -> int stream = <fun>

簡単な例を示しましょう。

# let s0 = intgen 1 100;;
val s0 : int stream = Cons (1, <lazy>)
# stream_head s0;;
- : int = 1
# let s1 = stream_tail s0;;
val s1 : int stream = Cons (2, <lazy>)
# stream_head s1;;
- : int = 2
# let s2 = stream_tail s1;;
val s2 : int stream = Cons (3, <lazy>)
# stream_head s2;;
- : int = 3

このように、遅延ストリームに stream_tail を適用することで、次々とデータを生成することができます。

もう一つ、簡単な例を示しましょう。フィボナッチ数列を生成する遅延ストリームを作ります。次のリストを見てください。

リスト : フィボナッチ数列を生成する遅延ストリーム

let rec fibonacci a b = Cons (a, lazy (fibonacci b (a + b)))

関数 fibonacci の型は次のようになります。

val fibonacci : int -> int -> int stream = <fun>

関数 fibonacci の引数 a がフィボナッチ数列の最初の項で、b が次の項です。したがって、遅延オブジェクトに fibonacci b (a + b) を格納しておけば、stream_tail を適用することでフィボナッチ数列を生成することができます。なお、この関数は無限ストリームを生成しますが、OCaml の整数 (int) には上限値があるので、際限なくフィボナッチ数列を生成できるわけではありません。ご注意ください。

これらの関数の動作を一般化すると、次のような関数を定義することができます。

リスト : ストリームの生成 (逆畳み込み)

let rec stream_unfold ?(pred = fun _ -> false) iterate seed =
  if pred seed then Nils
  else Cons (seed, lazy (stream_unfold ~pred iterate (iterate seed)))
val stream_unfold : ?pred:('a -> bool) -> ('a -> 'a) -> 'a -> 'a stream = <fun>

関数 stream_unfold は初項を引数 seed で受け取り、次項を関数 iterate で生成します。Cons の第 1 引数に seed を渡して、第 2 引数で stream_unfold を呼び出すときに iterate seed で次項を生成します。引数 pred は終了条件を表す述語です。pred seed が真であればストリームの終端 (Nils) を返します。pred のデフォルトは無条件で false を返すので無限ストリームになります。

簡単な実行例を示します。succ は引数を +1 する関数です。

# let s0 = stream_unfold succ 1;;
val s0 : int stream = Cons (1, <lazy>)
# stream_head s0;;
- : int = 1
# let s1 = stream_tail s0;;
val s1 : int stream = Cons (2, <lazy>)
# stream_head s1;;
- : int = 2
# let s2 = stream_tail s1;;
val s2 : int stream = Cons (3, <lazy>)
# stream_head s2;;
- : int = 3

# let f0 = stream_unfold (fun (a, b) -> (b, a + b)) (0, 1);;
val f0 : (int * int) stream = Cons ((0, 1), <lazy>)
# fst (stream_head f0);;
- : int = 0
# let f1 = stream_tail f0;;
val f1 : (int * int) stream = Cons ((1, 1), <lazy>)
# fst (stream_head f1);;
- : int = 1
# let f2 = stream_tail f1;;
val f2 : (int * int) stream = Cons ((1, 2), <lazy>)
# fst (stream_head f2);;
- : int = 1
# let f3 = stream_tail f2;;
val f3 : (int * int) stream = Cons ((2, 3), <lazy>)
# fst (stream_head f3);;
- : int = 2

●遅延ストリームの変換

リストを遅延ストリームに変換することも簡単にできます。次のリストを見てください。

リスト : リストを遅延ストリームに変換

let rec stream_of_list = function
    [] -> Nils
  | x::xs -> Cons(x, lazy (stream_of_list xs))

関数 stream_of_list は引数のリストから要素を順番に取り出して、遅延ストリームに格納して返すだけです。

簡単な実行例を示します。

# let s = stream_of_list [1; 2; 3; 4; 5];;
val s : int stream = Cons (1, <lazy>)
# stream_head s;;
- : int = 1
# stream_head (stream_tail s);;
- : int = 2
# stream_head (stream_tail (stream_tail s));;
- : int = 3

逆に、有限のストリームをリストに変換することも簡単です。次のリストを見てください。

リスト : 有限ストリームをリストに変換

let rec list_of_stream s =
  if s = Nils then []
  else (stream_head s) :: list_of_stream (stream_tail s)

遅延ストリームの要素を stream_head で取り出して、それをリストに追加していくだけです。

簡単な実行例を示します。

# let s = stream_of_list [1; 2; 3; 4; 5];;
val s : int stream = Cons (1, <lazy>)
# list_of_stream s;;
- : int list = [1; 2; 3; 4; 5]

●遅延ストリームの操作関数

次は遅延ストリームを操作する関数を作りましょう。最初は n 番目の要素を求める関数 stream_ref です。今回は OCaml に合わせて先頭の要素を 0 番目とします。

リスト : n 番目の要素を求める

let rec stream_ref s n =
  if s = Nils then raise Empty_stream
  else if n = 0 then stream_head s
  else stream_ref (stream_tail s) (n - 1)

関数 stream_ref の型は次のようになります。

val stream_ref : 'a stream -> int -> 'a = <fun>

stream_ref は遅延ストリーム s に stream_tail を n 回適用して n 番目の要素を求めます。

ストリームから n 個の要素を取り出して新しいストリームに格納して返す関数 stream_take や、ストリームから n 個の要素を取り除く関数 stream_drop も同様にプログラムすることができます。

リスト : n 個の要素を取り出す

let rec stream_take s n =
  if s = Nils || n = 0 then Nils
  else Cons (stream_head s, lazy (stream_take (stream_tail s) (n - 1)))
リスト : n 個の要素を取り除く

let rec stream_drop s n =
  if s = Nils || n = 0 then s
  else stream_drop (stream_tail s) (n - 1)
val stream_take : 'a stream -> int -> 'a stream = <fun>
val stream_drop : 'a stream -> int -> 'a stream = <fun>

どちらの関数も引数 s が遅延ストリームで、引数 n が要素の個数です。s が空になるか、n が 0 になるまで処理を繰り返します。stream_take は stream_head で要素を取り出し、それを新しいストリームに追加して返します。stream_take は有限の遅延ストリームを返すことに注意してください。stream_drop は stream_tail を n 回繰り返すだけです。

それでは、簡単な実行例を示しましょう。

# let s1 = fibonacci 0 1;;
val s1 : int stream = Cons (0, <lazy>)

# for i = 0 to 20 do Printf.printf "%d " (stream_ref s1 i) done; print_newline ();;
0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765
- : unit = ()

# list_of_stream (stream_take s1 20);;
- : int list =
[0; 1; 1; 2; 3; 5; 8; 13; 21; 34; 55; 89; 144; 233; 377; 610; 987; 1597;
 2584; 4181]

# list_of_stream (stream_take (stream_drop s1 20) 20);;
- : int list =
[6765; 10946; 17711; 28657; 46368; 75025; 121393; 196418; 317811; 514229;
 832040; 1346269; 2178309; 3524578; 5702887; 9227465; 14930352; 24157817;
 39088169; 63245986]

変数 s1 にフィボナッチ数列を生成するストリームをセットします。stream_ref で順番に要素を 21 個取り出すと、その値はフィボナッチ数列になっていますね。同様に、stream_take で 20 個の要素を取り出して list_of_stream でリストに変換すると、リストの要素はフィボナッチ数列になります。stream_drop で 20 個の要素を取り除き、そのあと stream_take で 20 個の要素を取り出すと、20 番目以降のフィボナッチ数列を求めることができます。

●遅延ストリームの連結

次は、2 つの遅延ストリームを受け取って 1 つのストリームを返す関数を考えます。一番簡単な操作は 2 つのストリームを結合することです。次のリストを見てください。

リスト : 遅延ストリームの結合

let rec stream_append s1 s2 =
  if s1 = Nils then s2
  else Cons (stream_head s1, lazy (stream_append (stream_tail s1) s2))

関数 stream_append はストリーム s1 と s2 を結合したストリームを返します。処理は簡単で、s1 の要素を順番に取り出していき、s1 が空になったら s2 を返すだけです。stream_append の型は次のようになります。

val stream_append : 'a stream -> 'a stream -> 'a stream = <fun>

簡単な実行例を示しましょう。

# let s1 = range 1 4;;
val s1 : int stream = Cons (1, <lazy>)
# let s2 = range 11 14;;
val s2 : int stream = Cons (11, <lazy>)
# let s3 = stream_append s1 s2;;
val s3 : int stream = Cons (1, <lazy>)
# list_of_stream s3;;
- : int list = [1; 2; 3; 4; 11; 12; 13; 14]

次はストリーム s1 と s2 の要素を交互に出力するストリームを作ります。次のリストを見てください。

リスト : ストリームの要素を交互に出力

let rec interleave s1 s2 =
  if s1 = Nils then s2
  else Cons (stream_head s1, lazy (interleave s2 (stream_tail s1)))

関数 interleave はストリーム s1 と s2 を受け取ります。そして、s1 の要素を新しいストリームに格納したら、次は s2 の要素を新しいストリームに格納します。これは遅延オブジェクトで interleave を呼び出すとき、引数 s1 と s2 の順番を交換するだけです。このとき、s1 は stream_tail を適用して次の要素を求めます。これで s1 と s2 の要素を交互に出力することができます。

interleave の型は次のようになります。

val interleave : 'a stream -> 'a stream -> 'a stream = <fun>

簡単な実行例を示しましょう。

# let s4 = interleave s1 s2;;
val s4 : int stream = Cons (1, <lazy>)
# list_of_stream s4;;
- : int list = [1; 11; 2; 12; 3; 13; 4; 14]

stream_append の場合、無限ストリームを結合することはできませんが、interleave ならば無限ストリームにも対応することができます。簡単な例を示しましょう。

# let rec ones = Cons (1, lazy ones);;
val ones : int stream = Cons (1, <lazy>)
# list_of_stream (stream_take ones 10);;
- : int list = [1; 1; 1; 1; 1; 1; 1; 1; 1; 1]

# let rec twos = Cons (2, lazy twos);;
val twos : int stream = Cons (2, <lazy>)
# list_of_stream (stream_take twos 10);;
- : int list = [2; 2; 2; 2; 2; 2; 2; 2; 2; 2]

# list_of_stream (stream_take (interleave ones twos) 10);;
- : int list = [1; 2; 1; 2; 1; 2; 1; 2; 1; 2]

ones は 1 を無限に出力するストリームで、twos は 2 を無限に出力するストリームです。stream_append で ones と twos を結合しても無限に 1 を出力するだけですが、interleave で ones と twos を結合すれば、1 と 2 を交互に出力することができます。これで無限ストリームの要素を混ぜ合わせることができます。

●参考 URL

  1. 『計算機プログラムの構造と解釈 第二版 (和田英一 訳)』, 3.5 ストリーム

初版 2008 年 9 月 14 日
改訂 2020 年 7 月 26 日