M.Hiroi's Home Page

Common Lisp Programming

お気楽 Common Lisp プログラミング入門

[ PrevPage | Common Lisp | NextPage ]

遅延ストリーム (1)

「ストリーム (stream)」はデータの流れを抽象化したデータ構造です。たとえば、ファイル入出力はストリームと考えることができます。リストを使ってストリームを表すこともできます。ただし、単純なリストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができるようになります。これを「遅延ストリーム」とか「遅延リスト」と呼びます。今回は遅延ストリームについて説明します。

●遅延ストリームの構造

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して値を求めればよいわけです。

今回は遅延ストリームをコンスセルで表すことにします。コンスセルの CAR が現時点での先頭データを表し、CDR が遅延ストリームを生成する関数を格納するプロミスです。次のリストを見てください。

リスト : 遅延ストリーム

;;; 遅延ストリームの生成
(defmacro stream-cons (a b)
  `(cons ,a (delay ,b)))

;;; 先頭要素を参照する
(defun stream-car (s) (car s))

;;; 先頭要素を取り除く
(defun stream-cdr (s) (force (cdr s)))

マクロ stream-cons はコンスセルの CAR にストリームの要素 A を格納し、CDR にプロミスを格納します。プロミスにはストリームを生成する関数 B を格納します。プロミスを force することで、次の要素を格納した遅延ストリームを生成します。ストリームの終端は NIL で表すことにします。

関数 stream-car は遅延ストリーム S から要素を取り出して返します。関数 stream-cdr は S のプロミスを force して、次の要素を格納した遅延ストリームを生成して返します。ようするに、これらのマクロと関数はリスト操作の cons, car, cdr に対応するわけです。

●遅延ストリームの生成

それでは、遅延ストリームを生成する関数を作りましょう。たとえば、low から high までの整数列を生成するストリームは次のようにプログラムすることができます。

リスト : 整数列を生成するストリーム

(defun range (low high &optional (step 1))
  (if (> low high)
      nil
    (stream-cons low (range (+ low step) high))))

関数 range は遅延ストリームを生成して返します。stream-cons の第 1 引数がストリームの初期値になります。そして、プロミスを force すると、(range (+ low step) high) が評価されて次のデータを格納した遅延ストリームが返されます。そして、その中のプロミスを force すると、その次のデータを得ることができます。

簡単な実行例を示しましょう。

* (setq a (range 1 10))

(1 . #S(PROMISE ...))
* (stream-car a)

1
* (stream-car (stream-cdr a))

2
* (stream-car (stream-cdr (stream-cdr a)))

3
* (stream-car (stream-cdr (stream-cdr (stream-cdr a))))

4

このように、CDR 部のプロミスを force することで、次々とデータを生成することができます。

もう一つ、簡単な例を示しましょう。フィボナッチ数列を生成する遅延ストリームを作ります。次のリストを見てください。

リスト : フィボナッチ数列を生成する遅延ストリーム

(defun fibonacci (a b)
  (stream-cons a (fibonacci b (+ a b))))

関数 fibonacci の引数 A がフィボナッチ数列の最初の項で、B が次の項です。したがって、プロミスに (fibonacci b (+ a b)) を格納しておけば、force することでフィボナッチ数列を生成することができます。Common Lisp は多倍長整数をサポートしているので、メモリの許す限りフィボナッチ数列を生成することができます。

* (setq b (fibonacci 0 1))

(0 . #S(PROMISE ...))
* (stream-car b)

0
* (stream-car (stream-cdr b))

1
* (stream-car (stream-cdr (stream-cdr b)))

1
* (stream-car (stream-cdr (stream-cdr (stream-cdr b))))

2

これらの関数の動作を一般化すると、次のような関数を定義することができます。

リスト : ストリームの生成 (逆畳み込み)

(defun stream-unfold (iterate seed &optional (pred (lambda (x) (declare (ignore x)) nil)))
  (if (funcall pred seed)
      nil
    (stream-cons seed (stream-unfold iterate (funcall iterate seed) pred))))

関数 stream-unfold は初項 SEED を受け取り、次項を関数 ITERATE で生成します。stream-cons の第 1 引数に SEED を渡して、第 2 引数で stream-unfold を呼び出すときに (funcall iterate seed) の返り値を渡します。引数 PRED は終了条件を表す述語です。(funcall pred seed) が真であればストリームの終端 (NIL) を返します。PRED のデフォルトは無条件で NIL を返すので無限ストリームになります。

簡単な実行例を示します。

* (setq c (stream-unfold #'1+ 1))

(1 . #S(PROMISE ...))
* (stream-car c)

1
* (stream-car (stream-cdr c))

2
* (stream-car (stream-cdr (stream-cdr c)))

3
* (stream-car (stream-cdr (stream-cdr (stream-cdr c))))

4
* (setq d (stream-unfold (lambda (xs) (list (second xs) (apply #'+ xs))) '(0 1)))

((0 1) . #S(PROMISE ...))
* (stream-car d)

(0 1)
* (stream-car (stream-cdr d))

(1 1)
* (stream-car (stream-cdr (stream-cdr d)))

(1 2)
* (stream-car (stream-cdr (stream-cdr (stream-cdr d))))

(2 3)
* (stream-car (stream-cdr (stream-cdr (stream-cdr (stream-cdr d)))))

(3 5)

stream-unfold はストリームを生成しますが、リストを生成する関数 unfold を定義することもできます。興味のある方は 補足: 逆畳み込み をお読みください。

●遅延ストリームの変換

リストを遅延ストリームに変換することも簡単にできます。次のリストを見てください。

リスト : リストを遅延ストリームに変換

(defun list-to-stream (xs)
  (if (null xs)
      nil
    (stream-cons (car xs) (list-to-stream (cdr xs)))))

関数 list-to-stream はリスト XS の先頭から要素を順番に取り出して、遅延ストリームに格納して返すだけです。

簡単な実行例を示します。

* (setq e (list-to-stream '(1 2 3 4 5)))

(1 . #S(PROMISE ...))
* (stream-car e)

1
* (stream-car (stream-cdr e))

2
* (stream-car (stream-cdr (stream-cdr e)))

3
* (setq f (list-to-stream '(a b c d e)))

(A . #S(PROMISE ...))
* (stream-car f)

A
* (stream-car (stream-cdr f))

B
* (stream-car (stream-cdr (stream-cdr f)))

C

逆に、有限のストリームをリストに変換することも簡単です。次のリストを見てください。

リスト : 有限ストリームをリストに変換

(defun stream-to-list (s)
  (loop until (null s) collect (stream-car s)
        do (setq s (stream-cdr s))))

ループマクロでストリーム S が空になるまで、要素 (stream-car s) を collect で集めるだけです。

簡単な実行例を示します。

* f

(A . #S(PROMISE ...))
* (stream-to-list f)

(A B C D E)

●遅延ストリームの操作関数

次は遅延ストリームを操作する関数を作りましょう。最初は n 番目の要素を求める関数 stream-ref です。本稿では Common Lisp のリストに合わせて先頭の要素を 0 番目とします。

リスト : n 番目の要素を求める

(defun stream-ref (s n)
  (if (zerop n)
      (stream-car s)
    (stream-ref (stream-cdr s) (1- n))))

stream-ref は stream-cdr でデータを生成し、それを N 回繰り返すことで N 番目の要素を求めます。stream-cdr は遅延ストリームを返すことに注意してください。あとは、N が 0 になったら stream-car でストリームの要素を取り出せばいいわけです。

ストリームから n 個の要素を取り出してストリームに格納して返す関数 stream-take と、n 個の要素を取り除く関数 stream-drop も同様にプログラムすることができます。

リスト : 先頭から n 個の要素を取り出す

(defun stream-take (s n)
  (if (or (null s) (zerop n))
      nil
    (stream-cons (stream-car s) 
                 (stream-take (stream-cdr s) (1- n)))))
リスト : 先頭から n 個の要素を取り除く

(defun stream-drop (s n)
  (if (or (null s) (zerop n))
      s
    (stream-drop (stream-cdr s) (1- n))))

どちらの関数も引数 S が遅延ストリームで、引数 N が要素の個数です。S が空になるか、N が 0 になるまで処理を繰り返します。stream-take は stream-car で要素を取り出し、それを新しいストリームに追加して返します。stream-take は有限の遅延ストリームを返すことに注意してください。stream-drop は stream-cdr を n 回繰り返すだけです。

簡単な実行例を示します。

* (setq fibo (fibonacci 0 1))

(0 . #S(PROMISE ...))
* (dotimes (x 10) (print (stream-ref fibo x)))

0
1
1
2
3
5
8
13
21
34
NIL
* (stream-to-list (stream-take fibo 16))

(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610)
* (stream-to-list (stream-take (stream-drop fibo 20) 10))

(6765 10946 17711 28657 46368 75025 121393 196418 317811 514229)

変数 FIBO にフィボナッチ数列を生成するストリームをセットします。stream-ref で順番に要素を 10 個求めると、その値はフィボナッチ数列になります。同様に、stream-take で 10 個の要素を取り出すと、リストの要素はフィボナッチ数列になります。stream-drop で 20 個の要素を取り除き、そのあと stream-take で 10 個の要素を取り出すと、21 番目以降のフィボナッチ数列を求めることができます。

●遅延ストリームの連結

次は、2 つの遅延ストリームを受け取って 1 つの遅延ストリームを返す関数を考えます。一番簡単な操作は 2 つの遅延ストリームを結合することです。次のリストを見てください。

リスト : 遅延ストリームの結合

(defun stream-append (s1 s2)
  (if (null s1)
      s2
    (stream-cons (stream-car s1)
                 (stream-append (stream-cdr s1) s2))))

関数 stream-append はストリーム S1 と S2 を結合したストリームを返します。処理は簡単で、S1 の要素を順番に取り出していき、S1 が空になったら S2 を返すだけです。

簡単な実行例を示しましょう。

* (setq s1 (range 1 4))

(1 . #S(PROMISE ...))
* (setq s2 (range 5 8))

(5 . #S(PROMISE ...))
* (setq s3 (stream-append s1 s2))

(1 . #S(PROMISE ...))
* (stream-to-list s3)

(1 2 3 4 5 6 7 8)

次は遅延ストリーム S1 と S2 の要素を交互に出力するストリームを作ります。次のリストを見てください。

リスト : 遅延ストリームの要素を交互に出力

(defun interleave (s1 s2)
  (if (null s1)
      s2
    (stream-cons (stream-car s1)
                 (interleave s2 (stream-cdr s1)))))

関数 interleave はストリーム S1 と S2 を受け取ります。そして、S1 の要素を新しい遅延ストリームに格納したら、次は S2 の要素を新しい遅延ストリームに格納します。これはプロミスで interleave を呼び出すとき、引数 S1 と S2 の順番を交換するだけです。このとき、S1 は stream-cdr で次の要素を求めます。これで S1 と S2 の要素を交互に出力することができます。

簡単な実行例を示しましょう。

* (stream-to-list (interleave s1 s2))

(1 5 2 6 3 7 4 8)

stream-append の場合、無限ストリームを結合することはできませんが、interleave ならば無限ストリームにも対応することができます。簡単な例を示しましょう。

* (setq ones (stream-cons 1 ones))

(1 . #S(PROMISE ...))
* (setq twos (stream-cons 2 twos))

(2 . #S(PROMISE ...))
* (stream-to-list (stream-take ones 10))

(1 1 1 1 1 1 1 1 1 1)
* (stream-to-list (stream-take twos 10))

(2 2 2 2 2 2 2 2 2 2)
* (stream-to-list (stream-take (interleave ones twos) 10))

(1 2 1 2 1 2 1 2 1 2)

ONES は 1 を無限に出力するストリームで、TWOS は 2 を無限に出力するストリームです。stream-append で ONES と TWOS を結合しても無限に 1 を出力するだけですが、interleave で ONES と TWOS を結合すれば、1 と 2 を交互に出力することができます。これで無限ストリームの要素を混ぜ合わせることができます。

●高階関数

次は遅延ストリーム用の高階関数を作りましょう。

リスト : 高階関数

;;; マッピング
(defun stream-map (proc &rest s)
  (if (member nil s)
      nil
    (stream-cons (apply proc (mapcar #'stream-car s))
                 (apply #'stream-map proc (mapcar #'stream-cdr s)))))

;;; フィルター
(defun stream-filter (pred s)
  (cond
   ((null s) nil)
   ((funcall pred (stream-car s))
    (stream-cons (stream-car s)
                 (stream-filter pred (stream-cdr s))))
   (t (stream-filter pred (stream-cdr s)))))

;;; 畳み込み
(defun stream-fold-left (proc a s)
  (if (null s)
      a
    (stream-fold-left proc (funcall proc a (stream-car s)) (stream-cdr s))))

(defun stream-fold-right (proc a s)
  (if (null s)
      a
    (funcall proc (stream-car s) (stream-fold-right proc a (stream-cdr s)))))

;;; 累積値を格納した遅延ストリームを返す
(defun stream-scan-left (fn a xs)
  (stream-cons a (if (null xs)
                     nil
                   (stream-scan-left fn (funcall fn (stream-car xs) a) (stream-cdr xs)))))

;;; 巡回
(defun stream-for-each (proc s)
  (unless (null s)
    (funcall proc (stream-car s))
    (stream-for-each proc (stream-cdr s))))

stream-map は引数のストリームの要素に関数 PROC を適用した結果を新しいストリームに格納して返します。stream-map は複数の遅延ストリームを受け取ることができます。stream-filter は述語 PRED が真を返す要素だけを新しいストリームに格納して返します。

stream-fold-left と stream-fold-right は遅延ストリームに対して畳み込み処理を行います。stream-for-each は遅延ストリームの要素に関数 PROC を適用します。無限ストリームの場合、これらの関数は処理が終了しないので注意してください。

stream-scan-left は遅延ストリーム XS の先頭から畳み込みを行い、計算途中の累積値を格納した遅延ストリームを返します。stream-fold-left と違って、XS が無限ストリームでも動作します。

簡単な実行例を示しましょう。

* (setq s1 (stream-unfold #'1+ 1))

(1 . #S(PROMISE ...))
* (setq s2 (stream-map (lambda (x) (* x x)) s1))

(1 . #S(PROMISE ...))
* (stream-to-list (stream-take s2 10))

(1 4 9 16 25 36 49 64 81 100)
* (setq s3 (stream-filter #'evenp s1))

(2 . #S(PROMISE ...))
* (stream-to-list (stream-take s3 10))

(2 4 6 8 10 12 14 16 18 20)
* (stream-to-list (stream-take s3 10))

(2 4 6 8 10 12 14 16 18 20)
* (stream-fold-left #'+ 0 (stream-take s1 10))

55
* (stream-fold-right #'+ 0 (stream-take s1 10))

55
* (stream-for-each #'print (stream-take s1 10))

1
2
3
4
5
6
7
8
9
10
NIL
* (setq s2 (stream-scan-left #'+ 0 s1))

(0 . #S(PROMISE ...))
* (stream-to-list (stream-take s2 20))

(0 1 3 6 10 15 21 28 36 45 55 66 78 91 105 120 136 153 171 190)
* (stream-to-list (stream-take (stream-scan-left #'* 1 s1) 20))

(1 1 2 6 24 120 720 5040 40320 362880 3628800 39916800 479001600 6227020800
 87178291200 1307674368000 20922789888000 355687428096000 6402373705728000
 121645100408832000)

変数 S1 に 1 から始まる整数列を生成する遅延ストリームをセットします。次に、S1 の要素を 2 乗する遅延ストリームを stream-map で生成して変数 S2 にセットします。stream-take で S2 から要素を 10 個取り出すと、S1 の要素を 2 乗した値になります。

S1 から偶数列の遅延ストリームを得るには、引数が偶数のときに真を返す述語を stream-filter に渡します。その返り値を変数 S3 にセットして、stream-take で 10 個の要素を取り出すと、リストの要素は 2 から 20 までの値になります。

stream-take で有限個の遅延ストリームを生成すると畳み込みを行うことができます。stream-fold-left と stream-fold-right で要素の合計値を求めると 55 になります。もちろん、stream-for-each も正常に動作します。

stream-scan-left に関数 + と初期値 0 を渡すと、S1 の累積度数を求めることができます。関数 * と初期値 1 と S1 を渡すと、階乗の数列を生成することができます。

●stream-map の便利な使い方

stream-map は複数の遅延ストリームを受け取ることができるので、それらの遅延ストリームに対していろいろな処理を定義することができます。次の例を見てください。

* (defun add-stream (s1 s2) (stream-map #'+ s1 s2))

ADD-STREAM
* (setq s1 (range 1 4))

(1 . #S(PROMISE ...))
* (setq s2 (range 11 14))

(11 . #S(PROMISE ...))
* (setq s3 (add-stream s1 s2))

(12 . #S(PROMISE ...))
* (stream-to-list s3)

(12 14 16 18)

add-stream は S1 と S2 の要素を加算した遅延ストリームを返します。この add-stream を使うと、整数を生成する遅延ストリームは次のように定義することができます。

* (setq ones (stream-cons 1 ones))

(1 . #S(PROMISE ...))
* (setq ints (stream-cons 1 (add-stream ones ints)))

(1 . #S(PROMISE ...))
* (stream-to-list (stream-take ints 10))

(1 2 3 4 5 6 7 8 9 10)

ストリーム INTS は、現在の INTS に 1 を足し算することで整数を生成しています。これで整数が生成できるとは不思議ですね。INTS の動作を図に示すと、次のようになります。

*ones* = Cons(1, delay *ones*)
       = Cons(1, lazy_obj1)

*ints* = Cons(1, delay (add-stream *ones* *ints*))
       = Cons(1, lazy_obj2)

lazy_obj2 = Cons(1 + 1, delay (add-stream (force lazy_obj1) (force lazy_obj2)))
          => Cons(2, delay (add-stream (force lazy_obj1) (force lazy_obj2)))
          => Cons(2, lazy_obj3)

lazy_obj3 => Cons(3, lazy (add-stream (force lazy_obj1) (force lazy_obj3)))
          => Cons(3, lazy_obj4)


        図 : ストリーム ints の動作

ONES を Cons(1, lazy_obj1) と表し、INTS を Cons(1, lazy_obj2) と表します。lazy_obj はプロミスを表します。INTS で次の要素を生成するとき、lazy_obj2 を force します。すると、add-stream に ONES と INTS が適用され、ストリームの要素 2 とプロミス lazy_obj3 が生成されます。このとき、lazy_obj3 の内容は add-stream (force lazy_obj1) (force lazy_obj2) になります。

次の要素を生成するときは、lazy_obj3 を force します。lazy_obj1 は Cons(1, lazy_obj1) に、lazy_obj2 は Cons(2, lazy_obj3) に評価されるので、ストリームの要素は 1 + 2 = 3 になり、プロミス lazy_obj4 の内容は add-stream (force lazy_obj1) (force lazy_obj3) になります。そして、このプロミスを force することで次の要素を求めることができます。

このように、プロミスの中に現時点の整数を保持しておき、そこに 1 を足し算することで整数列を生成しているわけです。ここで、プロミスは評価結果をキャッシュしているので、整数 n の次の値を簡単に計算できることに注意してください。もしも、プロミスを単純なクロージャで実装した場合、整数 n を求めるため再計算が行われるので、効率はとても悪くなります。

同様の方法でフィボナッチ数列を生成するストリームを定義することができます。

リスト : フィボナッチ数列の生成

(setq fibs (stream-cons 0 (stream-cons 1 (add-stream (stream-cdr fibs) fibs))))

FIBS が現在のフィボナッチ数列を表していて、(stream-cdr fibs) で次の要素を求めます。そして、それらを足し算することで、その次の要素を求めています。この場合、ストリームの初期値として 2 つの要素が必要になることに注意してください。

それでは実行してみましょう。

* (setq fibs (stream-cons 0 (stream-cons 1 (add-stream (stream-cdr fibs) fibs))))

(0 . #S(PROMISE ...))
* (stream-to-list (stream-take fibs 20))

(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

このように、2 つのストリームを使ってフィボナッチ数列を生成することができます。なお、stream-scan-left を使ってもフィボナッチ数列を定義することができます。

* (setq fibs (stream-cons 0 (stream-scan-left #'+ 1 fibo)))

(0 . #S(PROMISE ...))
* (stream-to-list (stream-take fibs 20))

(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

●stream-flatmap

次は高階関数 stream-flatmap を作りましょう。flatmap は map の結果を平坦化する関数で、具体的には map が返すリストの要素を append で連結する動作になります。引数がリストの場合、次のように定義することができます。

リスト : マッピングした結果を平坦化する

(defun flatmap (proc xs)
  (apply #'append (mapcar proc xs)))

拙作のページ 継続渡しスタイル で作成した関数 flatten を使うと、すべてのリストに対して平坦化を行いますが、apply を使ってリストの要素を append でつなげば、リストを一段階だけ平坦化することができます。

簡単な実行例を示します。

* (defun flatmap (proc xs) (apply #'append (mapcar proc xs)))

FLATMAP
* (flatmap (lambda (x) (list x x)) '(1 2 3 4 5 6 7 8))

(1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8)

stream-flatmap を定義する場合、次のように stream-append を使うと問題が発生します。

リスト : stream-flatmap の定義 (間違い)

(defun stream-flatmap (proc s)
  (if (null s)
      nil
    (stream-append (funcall proc (stream-car s))
                   (stream-flatmap proc (stream-cdr s)))))

Common Lisp の関数は正格評価なので、stream-append を実行する前に引数が評価されます。つまり、stream-flatmap の評価は遅延されるのではなく、遅延ストリームが空になるまで stream-flatmap が再帰呼び出しされるのです。これでは無限ストリームに対応することができません。

そこで、引数を遅延評価する関数 stream-append-delay を作ることにします。プログラムは次のようになります。

リスト : stream-append-delay と stream-flatmap

;;; 遅延ストリームの連結 (遅延評価版)
(defun stream-append-delay (s1 s2)
  (if (null s1)
      (force s2)
    (stream-cons (stream-car s1)
                 (stream-append-delay (stream-cdr s1) s2))))

;;; マッピングの結果を平坦化する
(defun stream-flatmap (proc s)
  (if (null s)
      nil
    (stream-append-delay (funcall proc (stream-car s))
                         (delay (stream-flatmap proc (stream-cdr s))))))

stream-append-delay は stream-append とほぼ同じですが、遅延ストリーム S1 が空になったらプロミス S2 を force で評価するところが異なります。stream-flatmap では、stream-appned のかわりに stream-append-delay を使います。このとき、delay で生成したプロミスを引数に渡します。

それでは実行してみましょう。

* (setq s1 (stream-unfold #'1+ 1))

(1 . #S(PROMISE ...))
* (setq s2 (stream-flatmap (lambda (x) (range 1 x)) s1))

(1 . #S(PROMISE ...))
* (stream-to-list (stream-take s2 55))

(1 1 2 1 2 3 1 2 3 4 1 2 3 4 5 1 2 3 4 5 6 1 2 3 4 5 6 7 1 2 3 4 5 6 7 8 1 2 3
 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 10)

s1 は無限ストリームになりますが、stream-flatmap は正常に動作していますね。

●stream-take-while と stream-drop-while

次は、遅延ストリームの先頭から述語が真を返す要素を取り出す stream-take-while と要素を取り除く stream-drop-while を作ります。

リスト : stream-take-while と stream-drop-while

;;; 述語 pred が真を返す要素を取り出す
(defun stream-take-while (pred s)
  (if (not (funcall pred (stream-car s)))
      nil
    (stream-cons (stream-car s)
                 (stream-take-while pred (stream-cdr s)))))

;;; 述語 pred が真を返す要素を取り除く
(defun stream-drop-while (pred s)
  (if (not (funcall pred (stream-car s)))
      s
    (stream-drop-while pred (stream-cdr s))))

どちらの関数も難しいところはないと思います。簡単な実行例を示しましょう。

* (setq s1 (stream-unfold #'1+ 1))

(1 . #S(PROMISE ...))
* (stream-to-list (stream-take-while (lambda (x) (< x 10)) s1))

(1 2 3 4 5 6 7 8 9)
* (stream-to-list (stream-take (stream-drop-while (lambda (x) (> x 10)) s1) 10))

(10 11 12 13 14 15 16 17 18 19)

●組 (pair) を生成する遅延ストリーム

次は、2 つのストリームからその要素の組み合わせを生成するストリームを作りましょう。要素が n 個のストリームの場合、組み合わせは n * n 個あります。次の図を見てください。

(a0, b0) (a0, b1) (a0, b2) ... (a0, bn)
(a1, b0) (a1, b1) (a1, b2) ... (a1, bn)
(a2, b0) (a2, b1) (a2, b2) ... (a2, bn)

                           ...

(an, b0) (an, b1) (an, b2) ... (an, bn)


        図 : n * n 個の組

これは「直積集合」を求めることと同じです。遅延ストリームが有限であれば、stream-flatmap と stream-map を使って簡単にプログラムできます。

リスト : 組の生成 (1)

(defun pair-stream (s1 s2)
  (stream-flatmap
   (lambda (x)
     (stream-map (lambda (y) (list x y)) s2))
   s1))

実行例を示します。

* (stream-to-list (pair-stream (range 1 4) (range 5 8)))

((1 5) (1 6) (1 7) (1 8) (2 5) (2 6) (2 7) (2 8) (3 5) (3 6) (3 7) (3 8) (4 5)
 (4 6) (4 7) (4 8))

ところが、この方法では無限ストリームに対応できません。実際、S2 に無限ストリームを渡した場合、S1 の最初の要素を a0 とすると (a0 S2の要素) という組しか生成されません。実際に試してみましょう。

* (stream-to-list (stream-take (pair-stream (range 1 4) (stream-unfold #'1+ 5)) 16))

((1 5) (1 6) (1 7) (1 8) (1 9) (1 10) (1 11) (1 12) (1 13) (1 14) (1 15) (1 16)
 (1 17) (1 18) (1 19) (1 20))

そこで、下図に示すように、対角線上に組を生成していくことにします。

   | a0  a1  a2  a3  a4  a5
---+-----------------------------
b0 | 0   1   3   6   10  15  ...
   |
b1 | 2   4   7   11  16  ...
   |
b2 | 5   8   12  17  ...
   |
b3 | 9   13  18  ...
   |
b4 | 14  19  ...
   |
b5 | 20 ...
   |
   | ...
   |


図 : 無限ストリームによる組の生成

図を見ればおわかりのように、対角線の要素数を n とすると、組は (an-1 b0), (an-2 b1), ..., (a1 bn-2), (a0 bn-1) となっています。これは、S1 から n 個の要素を取り出したリストと、S2 から n 個の要素を取り出して反転したリストを zip でまとめた形になっています。プログラムは次のようになります。

リスト : 組の生成 (2)

(defun pair-stream1 (s1 s2 &optional (n 1))
  (let ((ys (list-to-stream (nreverse (stream-to-list (stream-take s2 n))))))
    (stream-append-delay
     (stream-map (lambda (x y) (list x y)) (stream-take s1 n) ys)
     (delay (pair-stream1 s1 s2 (1+ n))))))

関数 pair-stream1 の引数 N が対角線上の要素数を表します。S2 から取り出したリストを nreverse で反転し、それをストリームに変換して変数 YS にセットします。S1 と YS の要素の組は stream-map を使えば簡単に生成できます。あとは、stream-appned-delay で stream-map と pair-stream1 を連結すればいいわけです。これで無限ストリームに対応することができます。

それでは実行してみましょう。

* (setq ps (pair-stream1 (stream-unfold #'1+ 1) (stream-unfold #'1+ 1)))

((1 1) . #S(PROMISE ...))
* (stream-to-list (stream-take ps 55))

((1 1) (1 2) (2 1) (1 3) (2 2) (3 1) (1 4) (2 3) (3 2) (4 1) (1 5) (2 4) (3 3)
 (4 2) (5 1) (1 6) (2 5) (3 4) (4 3) (5 2) (6 1) (1 7) (2 6) (3 5) (4 4) (5 3)
 (6 2) (7 1) (1 8) (2 7) (3 6) (4 5) (5 4) (6 3) (7 2) (8 1) (1 9) (2 8) (3 7)
 (4 6) (5 5) (6 4) (7 3) (8 2) (9 1) (1 10) (2 9) (3 8) (4 7) (5 6) (6 5) (7 4)
 (8 3) (9 2) (10 1))
* (stream-ref ps 10)

(1 5)
* (stream-ref ps 54)

(10 1)
* (stream-ref ps 100)

(10 5)

正常に動作していますね。

●補足: 逆畳み込み

畳み込みはリストの要素に関数を適用して値を求めますが、畳み込みとは逆の操作、つまり初期値と関数を受け取ってリストを生成する関数を考えることができます。これを「解きほぐし」とか「逆畳み込み」といいます。次のリストを見てください。

リスト : 逆畳み込み

(defun unfold (p f g seed &optional (tail-gen (lambda (x) (declare (ignore x)) nil)))
  (if (funcall p seed)
      (funcall tail-gen seed)
    (cons (funcall f seed)
          (unfold p f g (funcall g seed) tail-gen))))

(defun unfold-right (p f g seed &optional (tail nil))
  (labels ((iter (seed acc)
             (if (funcall p seed)
                 acc
               (iter (funcall g seed) (cons (funcall f seed) acc)))))
    (iter seed tail)))

関数 unfold と unfold-right は畳み込みを行う fold-right とfold-left の逆変換に相当する処理で、unfold と unfold-right の仕様は Scheme のライブラリ SRFI-1 を参考にしました。

unfold は値 seed に関数 f を適用し、その要素をリストに格納して返します。引数 p は終了条件を表す関数で、p が真を返すときリストの終端を関数 tail-gen で生成して返します。一般に、tail-gen は nil を返すのが普通です。関数 g は seed の値を更新するために使用します。したがって、生成されるリストの要素は次のようになります。

( (f (g seed))                   ; g を 1 回適用
  (f (g (g seed)))               ; g を 2 回適用
  (f (g (g (g seed))))           ; g を 3 回適用
  ...
  (f (g (g ... (g seed) ...))) ) ; g を n 回適用

リストの長さが n の場合、最後の要素は g を n 回適用し、その結果に f を適用することになります。unfold-right は生成されるリストの要素が unfold の逆になります。また、引数 tail は関数値ではなくリストの終端を表す値になります。

簡単な例を示しましょう。

* (unfold (lambda (x) (> x 10)) #'identity #'1+ 1)

(1 2 3 4 5 6 7 8 9 10)
* (unfold-right (lambda (x) (> x 10)) #'identity #'1+ 1)

(10 9 8 7 6 5 4 3 2 1)

このように、unfold を使って数列を生成する関数 iota を実現することができます。また、関数 identity のかわりに他の関数を渡すことで、関数 tabulate と同じ動作を実現できます。

unfold と unfold-right の seed は、数値だけではなくリストを渡すこともできます。たとえば、畳み込みを行う fold-right に cons を渡すと copy-list を実現できますが、解きほぐしを行う unfold で car と cdr を渡しても copy-list を実現することができます。

* (fold-right #'cons nil '(1 2 3 4 5))

(1 2 3 4 5)
* (unfold #'null #'car #'cdr '(1 2 3 4 5))

(1 2 3 4 5)

また、unfold を使って関数 maplist を実現することもできます。次の例を見てください。

* (maplist #'identity '(1 2 3 4 5))

((1 2 3 4 5) (2 3 4 5) (3 4 5) (4 5) (5))
* (unfold #'null #'identity #'cdr '(1 2 3 4 5))

((1 2 3 4 5) (2 3 4 5) (3 4 5) (4 5) (5))

unfold で identity のかわりに他の関数を渡すと、maplist と同じ動作になります。

●参考文献, URL

  1. "Structure and Interpretation of Computer Programs (SICP)" 3.5 Streams
  2. Gauche ユーザリファレンス: 6.19 遅延評価

初版 2008 年 11 月 9 日
改訂 2017 年 2 月 19 日
改訂 2020 年 4 月 11 日

Copyright (C) 2008-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | Common Lisp | NextPage ]