M.Hiroi's Home Page

Functional Programming

お気楽 OCaml プログラミング入門

[ PrevPage | OCaml | NextPage ]

メモ化と遅延評価

今回は「たらいまわし関数」を例題にして、「メモ化」と「遅延評価」について説明します。

●たらいまわし関数

最初に「たらいまわし関数」について説明します。次のリストを見てください。

リスト 1 : たらいまわし関数

let rec tarai x y z =
  if x <= y then y
  else tarai (tarai (x - 1) y z) (tarai (y - 1) z x) (tarai (z - 1) x y)

let rec tak x y z = 
  if x <= y then z
  else tak (tak (x - 1) y z) (tak (y - 1) z x) (tak (z - 1) x y)

関数 tarai や tak は「たらいまわし関数」といって、再帰的に定義されています。これらの関数は、引数の与え方によっては実行に時間がかかるため、Lisp などのベンチマークに利用されることがあります。Common Lisp のプログラムは ぬえ 鵺 NUETAK Function にあります。

関数 tarai は通称「竹内関数」と呼ばれていて、日本の代表的な Lisper である竹内郁雄先生によって考案されたそうです。そして、関数 tak は関数 tarai のバリエーションで、John Macarthy によって作成されたそうです。たらいまわし関数が Lisp のベンチマークで使われていたことは知っていましたが、このような由緒ある関数だとは思ってもいませんでした。

さっそく実行してみましょう。実行環境は Ubunts 18.04 (Windows Subsystem for Linux), Intel Core i5-6200U 2.30GHz, ocamlc (version 4.05.0) です。

tarai 13 6 0 : 1.55 [s]
tak 19 9 0   : 0.59 [s]

このように、たらいまわし関数は引数の値が小さくても実行に時間がかかります。

●メモ化による高速化

たらいまわし関数が遅いのは、同じ値を何度も計算しているためです。この場合、表 (table) を使って処理を高速化することができます。同じ値を何度も計算することがないように、計算した値は表に格納しておいて、2 回目以降は表から計算結果を求めるようにします。このような手法を「表計算法」とか「メモ化 (memoization または memoisation)」といいます。

OCaml の場合、メモ化はハッシュ表 (Hashtbl) を使うと簡単です。次のリストを見てください。

リスト 2 : たらいまわし関数のメモ化 (1)

(* メモ用のハッシュ表 *)
let table = Hashtbl.create 2048

let rec tarai x y z =
  let key = (x, y, z) in
  if Hashtbl.mem table key then Hashtbl.find table key
  else
    let value = if x <= y then y
    else
      tarai (tarai (x - 1) y z) (tarai (y - 1) z x) (tarai (z - 1) x y)
    in
    Hashtbl.add table key value;
    value

関数 tarai の値を格納するハッシュ表を大域変数 table に用意します。関数 tarai では、引数 x, y, z を要素とする組を作り、それをキーとしてハッシュ表 table を検索します。table に key があれば、その値を返します。そうでなければ、値を計算して table にセットして、その値を返します。

ところで、ハッシュ表は局所変数に格納することもできます。次のリストを見てください。

リスト 3 : たらいまわし関数のメモ化 (2)

(* 探索 *)
let lookup table func args =
  if Hashtbl.mem table args then
    Hashtbl.find table args
  else
    let value = func args in
    Hashtbl.add table args value;
    value

(* たらいまわし関数 *)
let rec tak (x, y, z) =
  if x <= y then z
  else memo_tak (memo_tak (x - 1, y, z),
                 memo_tak (y - 1, z, x),
                 memo_tak (z - 1, x, y))
and memo_tak =
  let table = Hashtbl.create 2048 in
  fun x -> lookup table tak x

let rec tarai (x, y, z) =
  if x <= y then y
  else memo_tarai (memo_tarai (x - 1, y, z),
                   memo_tarai (y - 1, z ,x),
                   memo_tarai (z - 1, x, y))
and memo_tarai =
  let table = Hashtbl.create 2048 in
  fun x -> lookup table tarai x

関数 lookup はハッシュ表 table から関数 func の引数 args に対応するデータを探します。ここでは関数の引数を組にまとめて args に渡すものとします。ハッシュ表にデータがある場合はその値を返します。そうでなければ、func args を評価して値 value を求め、それをハッシュ表に登録します。

関数 tak と tarai は自分自身を再帰呼び出しするのではなく、関数 memo_tak と memo_tarai を呼び出します。memo_tak と memo_tarai は、ハッシュ表を局所変数 table にセットしてから、匿名関数を使って関数本体を定義します。ハッシュ表が生成されるのは、memo_tak, memo_tarai に関数をセットするときの一回だけです。これで、その関数専用のハッシュ表を局所変数に用意することができます。memo_tak と memo_tarai の本体は、lookup を呼び出してハッシュ表から値を探索するだけです。

関数の型は次のようになります。

val memoize : ('a, 'b) Hashtbl.t -> ('a -> 'b) -> 'a -> 'b = <fun>
val tak : int * int * int -> int = <fun>
val memo_tak : int * int * int -> int = <fun>
val tarai : int * int * int -> int = <fun>
val memo_tarai : int * int * int -> int = <fun>

それでは実際に実行してみましょう。

tarai (192, 96, 0) : 0.10 [s]
tak (192, 96, 0)   : 0.34 [s]

このように、引数の値を増やしても高速に実行することができます。メモ化の効果は十分に出ていると思います。また、同じ計算を再度実行すると、メモ化の働きにより値をすぐに求めることができます。

●メモ化関数

このように関数をメモ化することは簡単にできますが、メモ化を行うたびに関数を修正するのは面倒です。このような場合、関数をメモ化する「メモ化関数」があると便利です。メモ化関数については 計算機プログラムの構造と解釈 第二版 (和田英一 訳)3.3.3 表の表現 に詳しい説明があります。

ただし、変数の値を書き換えることができない関数型言語の場合、汎用的なメモ化関数を作成することは難しく、OCaml でも簡単ではありません。そこで、今回は Lisp を使ってメモ化関数を作成してみましょう。Common Lisp と Scheme のプログラムは次のようになります。

リスト 4 : メモ化関数 (Common Lisp)

(defun memoize (func)
  (let ((table (make-hash-table :test #'equal)))
    #'(lambda (&rest args)
        (let ((value (gethash args table nil)))
          (unless value
            (setf value (apply func args))
            (setf (gethash args table) value))
          value))))

; たらいまわし関数
(defun tak (x y z)
  (if (<= x y)
      z
      (tak (tak (- x 1) y z) (tak (- y 1) z x) (tak (- z 1) x y))))

(defun tarai (x y z)
  (if (<= x y)
      y
      (tarai (tarai (- x 1) y z) (tarai (- y 1) z x) (tarai (- z 1) x y))))

; 関数を書き換える
(setf (symbol-function 'tak) (memoize #'tak))
(setf (symbol-function 'tarai) (memoize #'tarai))
リスト 5 : メモ化関数 (Scheme : Gauche)

; 汎用のメモ化関数
(define (memoize func)
  (let ((table (make-hash-table 'equal?)))
    (lambda args
      (if (hash-table-exists? table args)
          (hash-table-get table args)
          (let ((value (apply func args)))
            (hash-table-put! table args value)
            value)))))

; たらいまわし関数
(define (tak x y z)
  (if (<= x y)
      z
      (tak (tak (- x 1) y z) (tak (- y 1) z x) (tak (- z 1) x y))))

(define (tarai x y z)
  (if (<= x y)
      y
      (tarai (tarai (- x 1) y z) (tarai (- y 1) z x) (tarai (- z 1) x y))))

; 値を書き換える
(set! tak (memoize tak))
(set! tarai (memoize tarai))

関数 memoize は関数 func を引数に受け取り、それをメモ化した関数を返します。memoize が返す関数はクロージャなので、memoize の引数 func や局所変数 table にアクセスすることができます。また、無名関数 lambda の引数 args は可変個の引数を受け取るように定義します。これで、複数の引数を持つ関数にも対応することができます。

args の値は引数を格納したリストになるので、これをキーとして扱います。ハッシュ表 table に値がなければ、関数 func を呼び出して値を計算し、それを table にセットします。そしで、最後に値を返します。なお、変数 tak と tarai の値 (Common Lisp の場合は関数) を書き換えないと、関数 tak, tarai の中で再帰呼び出しするとき、メモ化した関数を呼び出すことはできません。ご注意ください。

●遅延評価による高速化

関数 tarai は「遅延評価 (delayed evaluation または lazy evaluation)」を行う処理系、たとえば関数型言語の Haskell では高速に実行することができます。また、Scheme でも delay と force を使って遅延評価を行うことができます。tarai のプログラムを見てください。x <= y のときに y を返しますが、このとき引数 z の値は必要ありませんね。引数 z の値は x > y のときに計算するようにすれば、無駄な計算を省略することができます。

なお、関数 tak は x <= y のときに z を返しているため、遅延評価で高速化することはできません。ご注意ください。

OCaml には遅延評価を行うための構文 lazy とモジュール Lazy が用意されています。また、完全ではありませんが、クロージャを使って遅延評価を行うこともできます。今回は Shiro さんWiLiKi にある Scheme:たらいまわしべんち を参考に、プログラムを作ってみましょう。次のリストを見てください。

リスト 6 : クロージャによる遅延評価

let rec tarai x y z =
  if x <= y then y
  else
    let zz = z () in
    tarai (tarai (x - 1) y (fun () -> zz))
          (tarai (y - 1) zz (fun () -> x))
          (fun () -> tarai (zz - 1) x (fun () -> y))

遅延評価したい処理をクロージャに包んで引数 z に渡します。そして、x > y のときに引数 z の関数を呼び出します。すると、クロージャ内の処理が評価されて z の値を求めることができます。たとえば、fun () -> 0 を z に渡す場合、z () とすると返り値は 0 になります。fun () -> x を渡せば、x に格納されている値が返されます。fun -> tarai ... を渡せば、関数 tarai が実行されてその値が返されるわけです。

関数 tarai の型は次のようになります。

val tarai : int -> int -> (unit -> int) -> int = <fun>

また、lazy 文を使うと、tarai は次のようになります。

リスト 7 : lazy による遅延評価

let rec tarai x y z =
  if x <= y then y
  else
    let zz = Lazy.force z in
    tarai (tarai (x - 1) y (lazy zz))
          (tarai (y - 1) zz (lazy x))
          (lazy (tarai (zz - 1) x (lazy y)))

lazy expr は、式 expr を評価せずに lazy_t というデータ (遅延オブジェクト) を返します。簡単な使用例を示しましょう。

# let a = lazy (10 + 20);;
val a : int lazy_t = <lazy>
# Lazy.force a;;
- : int = 30

lazy (10 + 20) の返り値を変数 a にセットします。このとき、式 10 + 20 は評価されていません。遅延オブジェクトの値を実際に求める関数が Lazy.force です。Lazy.force a を実行すると、式 10 + 20 を評価して値 30 を返します。

また、遅延オブジェクトは式の評価結果をキャッシュします。したがって、Lazy.force a を再度実行すると、同じ式を再評価することなく値を求めることができます。次の例を見てください。

# let a = lazy (print_string "eval"; 10 + 20);;
val a : int lazy_t = <lazy>
# Lazy.force a;;
eval- : int = 30
# Lazy.force a;;
- : int = 30

最初に Lazy.force a を実行すると、式 (print_string "eval"; 10 + 20) が評価されるので、画面に eval が表示されます。次に、Lazy.force a を実行すると、式を評価せずにキャッシュした値を返すので eval は表示されません。

lazy を使った場合、関数 tarai の型は次のようになります。

val tarai : int -> int -> int Lazy.t -> int = <fun>

それでは、実際に実行してみましょう。

tarai 192 96 0
closure : 0.00078 [s]
lazy    : 0.00047 [s]

実行時間が速いので、今回は tarai 192 96 0 を 100 回実行した時間から 1 回の実行時間を求めました。tarai の場合、遅延評価の効果はとても大きいですね。

ところで、クロージャや lazy を使わなくても、関数 tarai を高速化する方法があります。C++:language&libraries (リンク切れ) で Akira Higuchi さん (リンク切れ) が書かれたC言語の tarai 関数はとても高速です。OCaml でプログラムすると次のようになります。

リスト 8 : tarai の遅延評価

let rec tarai x y z =
  if x <= y then y
  else tarai_lazy (tarai (x - 1) y z) (tarai (y - 1) z x) (z - 1) x y
and tarai_lazy x y xx yy zz =
  if x <= y then y
  else
    let z = tarai xx yy zz in
    tarai_lazy (tarai (x - 1) y z) (tarai (y - 1) z x) (z - 1) x y
val tarai : int -> int -> int -> int = <fun>
val tarai_lazy : int -> int -> int -> int -> int -> int = <fun>

関数 tarai_lazy の引数 xx, yy, zz で z の値を表すところがポイントです。つまり、z の計算に必要な値を引数に保持し、z の値が必要になったときに tarai(xx, yy, zz) で計算するわけです。実際に実行してみると tarai 192 96 0 は 0.0011 [s] になりました。Akira Higuchi さんに感謝いたします。

●問題

次に示す関数を表計算法でプログラムしてください。

  1. 階乗を求める関数 fact n, (0 <= n <= 20)
  2. フィボナッチ数を求める関数 fibo n, (0 <= n <= 90)
  3. 組み合わせの数を求める関数 comb n r, (0 <= n <= 65, r <= n)












●解答

リスト : 表計算法の簡単な例題

exception Domain_error

(* 階乗 *)
let fact =
  let n = 20 in
  let table = Array.make (n + 1) 1 in
  for i = 1 to n do
    table.(i) <- table.(i - 1) * i
  done;
  fun x -> if x < 0 || x > n then raise Domain_error
           else table.(x)

(* フィボナッチ数 *)
let fibo =
  let n = 90 in
  let table = Array.make (n + 1) 0 in
  table.(1) <- 1;
  for i = 2 to n do
    table.(i) <- table.(i - 2) + table.(i - 1)
  done;
  fun x -> if x < 0 || x > n then raise Domain_error
           else table.(x)

(* 組み合わせの数 *)
let comb =
  let x = 65 in
  let table = Array.make (x + 1) [|1|] in
  table.(1) <- [|1; 1|];
  for i = 2 to x do
    table.(i) <- Array.make (i + 1) 1;
    for j = 1 to i - 1 do
      table.(i).(j) <- table.(i - 1).(j - 1) + table.(i - 1).(j)
    done
  done;
  fun n r -> if n > x || r > n then raise Domain_error
             else table.(n).(r)
exception Domain_error
val fact : int -> int = <fun>
val fibo : int -> int = <fun>
val comb : int -> int -> int = <fun>

簡単な実行例を示します。

# for i = 10 to 20 do print_int (fact i); print_newline () done;;
3628800
39916800
479001600
6227020800
87178291200
1307674368000
20922789888000
355687428096000
6402373705728000
121645100408832000
2432902008176640000
- : unit = ()
# fact (-1);;
Exception: Domain_error.
# fact 21;;
Exception: Domain_error.

# for i = 80 to 90 do print_int (fibo i); print_newline () done;;
23416728348467685
37889062373143906
61305790721611591
99194853094755497
160500643816367088
259695496911122585
420196140727489673
679891637638612258
1100087778366101931
1779979416004714189
2880067194370816120
- : unit = ()
# fibo 91;;
Exception: Domain_error.

# for i = 56 to 65 do print_int (comb i (i / 2)); print_newline () done;;
7648690600760440
15033633249770520
30067266499541040
59132290782430712
118264581564861424
232714176627630544
465428353255261088
916312070471295267
1832624140942590534
3609714217008132870
- : unit = ()
# comb 66 33;;
Exception: Domain_error.

# for i = 0 to 16 do for j = 0 to i do Printf.printf "%d " (comb i j) done; print_newline () done;;
1
1 1
1 2 1
1 3 3 1
1 4 6 4 1
1 5 10 10 5 1
1 6 15 20 15 6 1
1 7 21 35 35 21 7 1
1 8 28 56 70 56 28 8 1
1 9 36 84 126 126 84 36 9 1
1 10 45 120 210 252 210 120 45 10 1
1 11 55 165 330 462 462 330 165 55 11 1
1 12 66 220 495 792 924 792 495 220 66 12 1
1 13 78 286 715 1287 1716 1716 1287 715 286 78 13 1
1 14 91 364 1001 2002 3003 3432 3003 2002 1001 364 91 14 1
1 15 105 455 1365 3003 5005 6435 6435 5005 3003 1365 455 105 15 1
1 16 120 560 1820 4368 8008 11440 12870 11440 8008 4368 1820 560 120 16 1
- : unit = ()

初版 2008 年 9 月 14 日
改訂 2020 年 7 月 26 日

遅延ストリーム

「ストリーム (stream)」はデータの流れを抽象化したデータ構造です。たとえば、ファイル入出力はストリームと考えることができます。リストを使ってストリームを表すこともできます。ただし、単純なリストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができるようになります。これを「遅延ストリーム」とか「遅延リスト」と呼びます。今回は遅延ストリームについて説明します。

●遅延ストリームの構造

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して値を求めればよいわけです。

遅延ストリームのデータ型は次のようになります。

リスト : 遅延ストリームのデータ型とアクセス関数

open Lazy;;

(* 例外 *)
exception Empty_stream

(* データ型 *)
type 'a stream = Nils | Cons of 'a * 'a stream lazy.t

(* ストリームの先頭要素を取り出す *)
let stream_head = function
  Nils -> raise Empty_stream
| Cons (x, _) -> x

(* ストリームの次の要素を求める *)
let stream_tail = function
  Nils -> raise Empty_stream
| Cons(_, tail) -> force tail

データ型は 'a stream としました。Nils はストリームの終端を表します。無限ストリームだけを扱うのであれば Nils は必要ありません。Cons が遅延ストリームの本体を表していて、組の最初の要素が現時点での先頭データを表し、次の要素が遅延ストリームを生成する関数を格納する遅延オブジェクト (lazy.t) です。

遅延ストリームの生成はコンストラクタ Cons を使うことにします。関数 stream_head は遅延ストリームから要素を取り出して返します。関数 stream_tail は遅延オブジェクトを force して、次の要素を格納した遅延ストリームを生成して返します。ようするに、これらの操作はリストのコンス演算子 :: と関数 hd と tl に対応するわけです。

●遅延ストリームの生成

それでは、遅延ストリームを生成する関数を作りましょう。たとえば、low から high までの整数列を生成するストリームは次のようにプログラムすることができます。

リスト : 整数列を生成するストリーム

let rec range low high =
  if low > high then Nils
  else Cons (low, lazy (range (low + 1) high))

関数 range は遅延ストリームを生成して返します。Cons の第 1 要素が現時点でのデータになります。第 2 要素の遅延オブジェクトを force すると、range (low + 1) high が実行されて次のデータを格納した遅延ストリームが返されます。そして、その中の遅延オブジェクトを force すると、その次のデータを得ることができます。

関数 range の型は次のようになります。

val range : int -> int -> int stream = <fun>

簡単な例を示しましょう。

# let s0 = intgen 1 100;;
val s0 : int stream = Cons (1, <lazy>)
# stream_head s0;;
- : int = 1
# let s1 = stream_tail s0;;
val s1 : int stream = Cons (2, <lazy>)
# stream_head s1;;
- : int = 2
# let s2 = stream_tail s1;;
val s2 : int stream = Cons (3, <lazy>)
# stream_head s2;;
- : int = 3

このように、遅延ストリームに stream_tail を適用することで、次々とデータを生成することができます。

もう一つ、簡単な例を示しましょう。フィボナッチ数列を生成する遅延ストリームを作ります。次のリストを見てください。

リスト : フィボナッチ数列を生成する遅延ストリーム

let rec fibonacci a b = Cons (a, lazy (fibonacci b (a + b)))

関数 fibonacci の型は次のようになります。

val fibonacci : int -> int -> int stream = <fun>

関数 fibonacci の引数 a がフィボナッチ数列の最初の項で、b が次の項です。したがって、遅延オブジェクトに fibonacci b (a + b) を格納しておけば、stream_tail を適用することでフィボナッチ数列を生成することができます。なお、この関数は無限ストリームを生成しますが、OCaml の整数 (int) には上限値があるので、際限なくフィボナッチ数列を生成できるわけではありません。ご注意ください。

これらの関数の動作を一般化すると、次のような関数を定義することができます。

リスト : ストリームの生成 (逆畳み込み)

let rec stream_unfold ?(pred = fun _ -> false) iterate seed =
  if pred seed then Nils
  else Cons (seed, lazy (stream_unfold ~pred iterate (iterate seed)))
val stream_unfold : ?pred:('a -> bool) -> ('a -> 'a) -> 'a -> 'a stream = <fun>

関数 stream_unfold は初項を引数 seed で受け取り、次項を関数 iterate で生成します。Cons の第 1 引数に seed を渡して、第 2 引数で stream_unfold を呼び出すときに iterate seed で次項を生成します。引数 pred は終了条件を表す述語です。pred seed が真であればストリームの終端 (Nils) を返します。pred のデフォルトは無条件で false を返すので無限ストリームになります。

簡単な実行例を示します。succ は引数を +1 する関数です。

# let s0 = stream_unfold succ 1;;
val s0 : int stream = Cons (1, <lazy>)
# stream_head s0;;
- : int = 1
# let s1 = stream_tail s0;;
val s1 : int stream = Cons (2, <lazy>)
# stream_head s1;;
- : int = 2
# let s2 = stream_tail s1;;
val s2 : int stream = Cons (3, <lazy>)
# stream_head s2;;
- : int = 3

# let f0 = stream_unfold (fun (a, b) -> (b, a + b)) (0, 1);;
val f0 : (int * int) stream = Cons ((0, 1), <lazy>)
# fst (stream_head f0);;
- : int = 0
# let f1 = stream_tail f0;;
val f1 : (int * int) stream = Cons ((1, 1), <lazy>)
# fst (stream_head f1);;
- : int = 1
# let f2 = stream_tail f1;;
val f2 : (int * int) stream = Cons ((1, 2), <lazy>)
# fst (stream_head f2);;
- : int = 1
# let f3 = stream_tail f2;;
val f3 : (int * int) stream = Cons ((2, 3), <lazy>)
# fst (stream_head f3);;
- : int = 2

●遅延ストリームの変換

リストを遅延ストリームに変換することも簡単にできます。次のリストを見てください。

リスト : リストを遅延ストリームに変換

let rec stream_of_list = function
    [] -> Nils
  | x::xs -> Cons(x, lazy (stream_of_list xs))

関数 stream_of_list は引数のリストから要素を順番に取り出して、遅延ストリームに格納して返すだけです。

簡単な実行例を示します。

# let s = stream_of_list [1; 2; 3; 4; 5];;
val s : int stream = Cons (1, <lazy>)
# stream_head s;;
- : int = 1
# stream_head (stream_tail s);;
- : int = 2
# stream_head (stream_tail (stream_tail s));;
- : int = 3

逆に、有限のストリームをリストに変換することも簡単です。次のリストを見てください。

リスト : 有限ストリームをリストに変換

let rec list_of_stream s =
  if s = Nils then []
  else (stream_head s) :: list_of_stream (stream_tail s)

遅延ストリームの要素を stream_head で取り出して、それをリストに追加していくだけです。

簡単な実行例を示します。

# let s = stream_of_list [1; 2; 3; 4; 5];;
val s : int stream = Cons (1, <lazy>)
# list_of_stream s;;
- : int list = [1; 2; 3; 4; 5]

●遅延ストリームの操作関数

次は遅延ストリームを操作する関数を作りましょう。最初は n 番目の要素を求める関数 stream_ref です。今回は OCaml に合わせて先頭の要素を 0 番目とします。

リスト : n 番目の要素を求める

let rec stream_ref s n =
  if s = Nils then raise Empty_stream
  else if n = 0 then stream_head s
  else stream_ref (stream_tail s) (n - 1)

関数 stream_ref の型は次のようになります。

val stream_ref : 'a stream -> int -> 'a = <fun>

stream_ref は遅延ストリーム s に stream_tail を n 回適用して n 番目の要素を求めます。

ストリームから n 個の要素を取り出して新しいストリームに格納して返す関数 stream_take や、ストリームから n 個の要素を取り除く関数 stream_drop も同様にプログラムすることができます。

リスト : n 個の要素を取り出す

let rec stream_take s n =
  if s = Nils || n = 0 then Nils
  else Cons (stream_head s, lazy (stream_take (stream_tail s) (n - 1)))
リスト : n 個の要素を取り除く

let rec stream_drop s n =
  if s = Nils || n = 0 then s
  else stream_drop (stream_tail s) (n - 1)
val stream_take : 'a stream -> int -> 'a stream = <fun>
val stream_drop : 'a stream -> int -> 'a stream = <fun>

どちらの関数も引数 s が遅延ストリームで、引数 n が要素の個数です。s が空になるか、n が 0 になるまで処理を繰り返します。stream_take は stream_head で要素を取り出し、それを新しいストリームに追加して返します。stream_take は有限の遅延ストリームを返すことに注意してください。stream_drop は stream_tail を n 回繰り返すだけです。

それでは、簡単な実行例を示しましょう。

# let s1 = fibonacci 0 1;;
val s1 : int stream = Cons (0, <lazy>)

# for i = 0 to 20 do Printf.printf "%d " (stream_ref s1 i) done; print_newline ();;
0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765
- : unit = ()

# list_of_stream (stream_take s1 20);;
- : int list =
[0; 1; 1; 2; 3; 5; 8; 13; 21; 34; 55; 89; 144; 233; 377; 610; 987; 1597;
 2584; 4181]

# list_of_stream (stream_take (stream_drop s1 20) 20);;
- : int list =
[6765; 10946; 17711; 28657; 46368; 75025; 121393; 196418; 317811; 514229;
 832040; 1346269; 2178309; 3524578; 5702887; 9227465; 14930352; 24157817;
 39088169; 63245986]

変数 s1 にフィボナッチ数列を生成するストリームをセットします。stream_ref で順番に要素を 21 個取り出すと、その値はフィボナッチ数列になっていますね。同様に、stream_take で 20 個の要素を取り出して list_of_stream でリストに変換すると、リストの要素はフィボナッチ数列になります。stream_drop で 20 個の要素を取り除き、そのあと stream_take で 20 個の要素を取り出すと、20 番目以降のフィボナッチ数列を求めることができます。

●遅延ストリームの連結

次は、2 つの遅延ストリームを受け取って 1 つのストリームを返す関数を考えます。一番簡単な操作は 2 つのストリームを結合することです。次のリストを見てください。

リスト : 遅延ストリームの結合

let rec stream_append s1 s2 =
  if s1 = Nils then s2
  else Cons (stream_head s1, lazy (stream_append (stream_tail s1) s2))

関数 stream_append はストリーム s1 と s2 を結合したストリームを返します。処理は簡単で、s1 の要素を順番に取り出していき、s1 が空になったら s2 を返すだけです。stream_append の型は次のようになります。

val stream_append : 'a stream -> 'a stream -> 'a stream = <fun>

簡単な実行例を示しましょう。

# let s1 = range 1 4;;
val s1 : int stream = Cons (1, <lazy>)
# let s2 = range 11 14;;
val s2 : int stream = Cons (11, <lazy>)
# let s3 = stream_append s1 s2;;
val s3 : int stream = Cons (1, <lazy>)
# list_of_stream s3;;
- : int list = [1; 2; 3; 4; 11; 12; 13; 14]

次はストリーム s1 と s2 の要素を交互に出力するストリームを作ります。次のリストを見てください。

リスト : ストリームの要素を交互に出力

let rec interleave s1 s2 =
  if s1 = Nils then s2
  else Cons (stream_head s1, lazy (interleave s2 (stream_tail s1)))

関数 interleave はストリーム s1 と s2 を受け取ります。そして、s1 の要素を新しいストリームに格納したら、次は s2 の要素を新しいストリームに格納します。これは遅延オブジェクトで interleave を呼び出すとき、引数 s1 と s2 の順番を交換するだけです。このとき、s1 は stream_tail を適用して次の要素を求めます。これで s1 と s2 の要素を交互に出力することができます。

interleave の型は次のようになります。

val interleave : 'a stream -> 'a stream -> 'a stream = <fun>

簡単な実行例を示しましょう。

# let s4 = interleave s1 s2;;
val s4 : int stream = Cons (1, <lazy>)
# list_of_stream s4;;
- : int list = [1; 11; 2; 12; 3; 13; 4; 14]

stream_append の場合、無限ストリームを結合することはできませんが、interleave ならば無限ストリームにも対応することができます。簡単な例を示しましょう。

# let rec ones = Cons (1, lazy ones);;
val ones : int stream = Cons (1, <lazy>)
# list_of_stream (stream_take ones 10);;
- : int list = [1; 1; 1; 1; 1; 1; 1; 1; 1; 1]

# let rec twos = Cons (2, lazy twos);;
val twos : int stream = Cons (2, <lazy>)
# list_of_stream (stream_take twos 10);;
- : int list = [2; 2; 2; 2; 2; 2; 2; 2; 2; 2]

# list_of_stream (stream_take (interleave ones twos) 10);;
- : int list = [1; 2; 1; 2; 1; 2; 1; 2; 1; 2]

ones は 1 を無限に出力するストリームで、twos は 2 を無限に出力するストリームです。stream_append で ones と twos を結合しても無限に 1 を出力するだけですが、interleave で ones と twos を結合すれば、1 と 2 を交互に出力することができます。これで無限ストリームの要素を混ぜ合わせることができます。

●参考 URL

  1. 計算機プログラムの構造と解釈 第二版 (和田英一 訳), 3.5 ストリーム

初版 2008 年 9 月 14 日
改訂 2020 年 7 月 26 日

Copyright (C) 2008-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | OCaml | NextPage ]