M.Hiroi's Home Page

Common Lisp Programming

お気楽 Common Lisp プログラミング入門

[ PrevPage | Common Lisp | NextPage ]

遅延ストリーム (2)

●遅延ストリームの併合

次は、要素を昇順に出力する 2 つの遅延ストリームを併合 (マージ, merge) する関数を作りましょう。次のリストを見てください。

リスト : 遅延ストリームのマージ

(defun stream-merge (s1 s2)
  (cond
   ((null s1) s2)
   ((null s2) s1)
   ((<= (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1) (stream-merge (stream-cdr s1) s2)))
   (t
    (stream-cons (stream-car s2) (stream-merge s1 (stream-cdr s2))))))

stream-merge は 2 つの遅延ストリームを併合して新しい遅延ストリームを返します。S1 が空であれば S2 を返し、S2 が空ならば S1 を返します。そうでなければ、遅延ストリームの先頭要素を比較します。S1 の要素が S2 の要素以下ならば S1 の要素を、そうでなければ S2 の要素を遅延ストリームに格納します。

簡単な実行例を示しましょう。

* (setq s1 (stream-unfold (lambda (x) (+ x 2)) 1))

(1 . #S(PROMISE ...))
* (setq s2 (stream-unfold (lambda (x) (+ x 2)) 2))

(2 . #S(PROMISE ...))
* (setq s3 (stream-merge s1 s2))

(1 . #S(PROMISE ...))
* (stream-to-list (stream-take s3 20))

(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20)
* (stream-to-list (stream-take (stream-merge s1 s1) 20))

(1 1 3 3 5 5 7 7 9 9 11 11 13 13 15 15 17 17 19 19)

●集合演算

ここで、遅延ストリームには重複要素が存在せず、要素は昇順に出力されることを前提にすると、遅延ストリームでも集合演算を行うことができます。次のリストを見てください。

リスト : 集合演算

;;; 和集合
(defun stream-union (s1 s2)
  (cond
   ((null s1) s2)
   ((null s2) s1)
   ((= (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1)
                 (stream-union (stream-cdr s1) (stream-cdr s2))))
   ((< (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1)
                 (stream-union (stream-cdr s1) s2)))
   (t
    (stream-cons (stream-car s2)
                 (stream-union s1 (stream-cdr s2))))))

;;; 積集合
(defun stream-intersect (s1 s2)
  (cond
   ((or (null s1) (null s2)) nil)
   ((= (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1)
                 (stream-intersect (stream-cdr s1) (stream-cdr s2))))
   ((< (stream-car s1) (stream-car s2))
    (stream-intersect (stream-cdr s1) s2))
   (t
    (stream-intersect s1 (stream-cdr s2)))))

stream-union は S1 と S2 から要素を取り出して、小さいほうを遅延ストリームに追加します。等しい場合は要素をひとつだけ追加します。このとき、S1 と S2 の両方から先頭要素を取り除くことに注意してください。

stream-intersect も簡単です。S1, S2 の先頭要素を比較して、等しい場合はその要素を遅延ストリームに追加します。S1 の要素が S2 の要素よりも小さい場合は、S1 を一つ進めて次の要素を調べます。S2 の要素が小さい場合は s2 の次の要素を調べます。

簡単な実行例を示しましょう。

* (setq s1 (stream-scan-left #'+ 1 (stream-unfold #'1+ 2)))

(1 . #S(PROMISE ...))
* (setq s2 (stream-map (lambda (x) (* x x)) (stream-unfold #'1+ 1)))

(1 . #S(PROMISE ...))
* (stream-to-list (stream-take s1 20))

(1 3 6 10 15 21 28 36 45 55 66 78 91 105 120 136 153 171 190 210)
* (stream-to-list (stream-take s2 20))

(1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400)
* (stream-to-list (stream-take (stream-union s1 s2) 20))

(1 3 4 6 9 10 15 16 21 25 28 36 45 49 55 64 66 78 81 91)
* (stream-to-list (stream-take (stream-intersect s1 s2) 7))

(1 36 1225 41616 1413721 48024900 1631432881)

遅延ストリーム S1 は「三角数」、S2 は「四角数」を表します。これらの遅延ストリームを stream-union でまとめると、三角数または四角数の数列になります。stream-intersect でまとめると、三角数かつ四角数の数列 (平方三角数) になります。平方三角数は拙作のページ Puzzle DE Progamming 多角数 でも取り上げています。興味のある方はお読みくださいませ。

●ハミングの問題

ここで stream-union を使うと簡単に解ける問題を紹介しましょう。

[ハミングの問題]

7 以上の素数で割り切れない正の整数を小さい順に N 個求めよ

参考文献 : 奥村晴彦,『C言語による最新アルゴリズム事典』, 技術評論社, 1991 (361 ページより引用)

7 以上の素数で割り切れない正の整数は、素因子が 2, 3, 5 しかない自然数のことで、これを「ハミング数 (Hamming Numbers)」といいます。ハミング数は素因数分解したとき、2i * 3j * 5k (i, j, k >= 0) の形式になります。たとえば、100 以下のハミング数は次のようになります。

1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 27, 30, 32, 36, 40, 45, 48, 50, 
54, 60, 64, 72, 75, 80, 81, 90, 96, 100

遅延ストリームを使うと「ハミングの問題」は簡単に解くことができます。小さい順にハミング数を出力する遅延ストリームを hs としましょう。hs は 1 から始まるので次のように定義できます。

(defvar hs (stream-cons 1 (...))

最初の要素は 1 なので、それに 2, 3, 5 を掛け算した値 (2, 3, 5) もハミング数になります。この値は次の S 式で生成することができます。

(stream-map (lambda (x) (* x 2) hs)
(stream-map (lambda (x) (* x 3) hs)
(stream-map (lambda (x) (* x 5) hs)

あとは、これらの遅延ストリームを stream-union でひとつにまとめて、小さい順に出力すればいいわけです。

プログラムと実行結果を示します。

リスト : ハミングの問題

(defvar *hs*)
(setq *hs
      (stream-cons
       1
       (stream-union
         (stream-map (lambda (x) (* x 2)) *hs*)
         (stream-union (stream-map (lambda (x) (* x 3)) *hs*)
                       (stream-map (lambda (x) (* x 5)) *hs*)))))
* (stream-to-list (stream-take *hs* 100))

(1 2 3 4 5 6 8 9 10 12 15 16 18 20 24 25 27 30 32 36 40 45 48 50 54 60 64 72 75
 80 81 90 96 100 108 120 125 128 135 144 150 160 162 180 192 200 216 225 240
 243 250 256 270 288 300 320 324 360 375 384 400 405 432 450 480 486 500 512
 540 576 600 625 640 648 675 720 729 750 768 800 810 864 900 960 972 1000 1024
 1080 1125 1152 1200 1215 1250 1280 1296 1350 1440 1458 1500 1536)

●順列の生成

次は遅延ストリームを使って順列を生成するプログラムを作ってみましょう。遅延ストリームを使う場合、再帰呼び出しの一番深いところで順列が完成するようにプログラムするとうまくいきません。要素が n 個の順列を生成する場合、n - 1 個の順列を生成するストリームを生成し、そこに要素を一つ加えて n 個の順列を生成すると考えます。

基本的には、拙作のページ 順列と組み合わせ で作成した、順列をリストに格納して返すプログラム [順列の生成 (4)] と同じです。このプログラムを遅延ストリームに対応させると次のようになります。

リスト : 遅延ストリームによる順列の生成

(defun make-perm (n s)
  (if (zerop n)
      (stream-cons nil nil)
    (stream-flatmap
     (lambda (x)
         (stream-map (lambda (y) (cons x y))
                     (make-perm (1- n)
                                (stream-filter (lambda (z) (not (eql x z))) s))))
     s)))

関数 make-perm はストリーム S の中から要素を N 個選ぶ順列を生成します。N = 0 の場合は空リストを格納したストリームを返します。あとは、stream-flatmap のラムダ式の中で、make-perm を再帰呼び出しして N - 1 個を選ぶ順列を生成します。ストリーム S から要素 X を取り除くため、stream-filter を使っています。これで順列を生成するストリームを作ることができます。

それでは実際に試してみましょう。

* (defvar *ps* (make-perm 4 (range 1 4)))

*PS*
* (stream-to-list *ps*)

((1 2 3 4) (1 2 4 3) (1 3 2 4) (1 3 4 2) (1 4 2 3) (1 4 3 2) (2 1 3 4)
 (2 1 4 3) (2 3 1 4) (2 3 4 1) (2 4 1 3) (2 4 3 1) (3 1 2 4) (3 1 4 2)
 (3 2 1 4) (3 2 4 1) (3 4 1 2) (3 4 2 1) (4 1 2 3) (4 1 3 2) (4 2 1 3)
 (4 2 3 1) (4 3 1 2) (4 3 2 1))

24 通りの順列をすべて求めることができました。

●8クイーンの解法

同様に、遅延ストリームを使って 8 クイーンを解くことができます。

リスト : 8 クイーンの解法 (遅延ストリーム版)

;;; 衝突のチェック
(defun attack (q xs &optional (n 1))
  (cond
   ((null xs) nil)
   ((or (= (+ q n) (car xs)) (= (- q n) (car xs))) t)
   (t (attack q (cdr xs) (1+ n)))))

;;; N Queen の解を求める
(defun queen (s)
  (if (null s)
      (stream-cons nil nil)
    (stream-filter
     (lambda (ls)
       (if (null ls)
           t
         (not (attack (car ls) (cdr ls)))))
     (stream-flatmap
      (lambda (x)
        (stream-map (lambda (y) (cons x y))
                    (queen (stream-filter (lambda (z) (not (eql x z))) s))))
      s))))

関数 queen は make-perm とほぼ同じですが、追加したクイーンが他のクイーンと衝突している場合は stream-filter を使って取り除いています。衝突をチェックする関数 attack は拙作のページ 生成検定法 の Eight Queens Problem で作成したプログラムと同じです。

それでは実行してみましょう。

* (stream-to-list (stream-take (queen (range 1 8)) 10))

((1 5 8 6 3 7 2 4) (1 6 8 3 7 4 2 5) (1 7 4 6 8 2 5 3) (1 7 5 8 2 4 6 3)
 (2 4 6 8 3 1 7 5) (2 5 7 1 3 8 6 4) (2 5 7 4 1 8 6 3) (2 6 1 7 4 8 3 5)
 (2 6 8 3 1 4 7 5) (2 7 3 6 8 5 1 4))

解の総数は全部で 92 通りあります。

●木の巡回と遅延ストリーム

拙作のページ 継続渡しスタイル では、リストを木とみなして、木を巡回するプログラムを作りました。プログラムを CPS に変換すると、遅延ストリームに対応するのも簡単です。次のリストを見てください。

リスト : 木の巡回 (遅延ストリーム版)

(defun stream-of-tree (ls cont)
  (cond
   ((null ls) (funcall cont))
   ((atom ls)
    (stream-cons ls (funcall cont)))
   (t
    (stream-of-tree
     (car ls)
     (lambda ()
       (stream-of-tree
        (cdr ls)
        (lambda () (funcall cont))))))))

stream-of-tree は木を巡回してその要素を順番に出力する遅延ストリームを生成します。stream-of-tree は LS が葉の場合に stream-cons で遅延ストリームを生成して返します。このとき、LS が遅延ストリームの要素になり、プロミスには継続 CONT の呼び出しを格納します。このプロミスを force することで、次の要素を求めることができます。

なお、stream-of-tree を呼び出すときに渡す継続が一番最後に呼び出されるので、遅延ストリームの終端 NIL を返すように定義してください。

簡単な実行例を示しましょう。

* (setq tree (stream-of-tree '(a (b (c (d . e) f) g) h) (lambda () nil)))

(A . #S(PROMISE ...))
* (stream-to-list tree)

(A B C D E F G H)

●ツリーマッチング

stream-of-tree を使うと、2 つの木を比較する関数 same-fringe-p を簡単に作ることができます。同じ葉を同じ並びで持つ場合、same-fringe-p は t を返します。次の例を見てください。

(same-fringe-p '(1 2 (3) 4) '(1 2 (3 4)) => T
(same-fringe-p '(1 2 (3) 4) '(1 2 (4) 3) => NIL

最初の例の場合、木の構造は違いますが、要素はどちらの木も 1, 2, 3, 4 の順番で並んでいるので、same-fringe-p は T を返します。次の例では、木の構造は同じですが、 3 と 4 の順番が逆になっています。この場合、same-fringe-p は NIL を返します。

プログラムは次のようになります。

リスト : ツリーマッチング

(defun same-fringe-p (tree1 tree2 &key (test #'eql))
  (labels ((iter (s1 s2)
             (cond
              ((and (null s1) (null s2)) t)
              ((or (null s1) (null s2)) nil)
              ((funcall test (stream-car s1) (stream-car s2))
               (iter (stream-cdr s1) (stream-cdr s2)))
              (t nil))))
    (iter (stream-of-tree tree1 (lambda () nil))
          (stream-of-tree tree2 (lambda () nil)))))

実際の処理は局所関数 iter で行います。same-fringe-p は stream-of-tree で木の遅延ストリームを生成して iter に渡します。あとは、遅延ストリームから要素を一つずつ取り出して、それが等しいかチェックするだけです。

それでは実行例を示します。

* (same-fringe-p '(1 2 (3 4 (5 . 6) 7) 8) '(1 2 (3 4 (5 6) 7) 8))

T
* (same-fringe-p '(1 2 (3 4 (5 . 6) 7) 8) '(1 2 (3 4 (6 5) 7) 8))

NIL

正常に動作していますね。

●エラトステネスの篩

最後に簡単な例題として、ストリームを使って素数を求めるプログラムを作ってみましょう。

考え方は簡単です。最初に 2 から始まる整数列を生成するストリームを用意します。2 は素数なので、素数ストリームの要素になります。次に、この整数列から 2 で割り切れる整数を取り除き除きます。これは stream-filter を使うと簡単です。2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これも stream-filter を使えば簡単です。

このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くように stream-filter を設定すればいいわけです。このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。

ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番に stream-fiter で設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。

リスト : 素数の生成

(defun sieve (s)
  (stream-cons
    (stream-car s)
    (sieve (stream-filter (lambda (x) (/= (mod x (stream-car s)) 0))
                          (stream-cdr s)))))

sieve には 2 から始まる整数列を生成するストリームを渡します。stream-cdr でプロミスを force すると、stream-filter により整数列から 2 で割り切れる整数を取り除いたストリームが返されます。次の要素 3 を取り出すとき、このストリームに対して 3 で割り切れる整数を取り除くことになるので、2 と 3 で割り切れる整数が取り除かれることになります。次の要素は 5 になりますが、そのストリームからさらに 5 で割り切れる整数が stream-filter で取り除かれることになります。

このように stream-filter を重ねて設定していくことで、素数でない整数をふるい落としていくことができるわけです。それでは実行してみましょう。

* (setq ps (sieve (stream-unfold #'1+ 2)))

(2 . #S(PROMISE ...))
* (stream-to-list (stream-take ps 25))

(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)
* (stream-to-list (stream-take ps 100))

(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103
 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199
 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313
 317 331 337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433
 439 443 449 457 461 463 467 479 487 491 499 503 509 521 523 541)

stream-unfold で 2 から始まる整数列を sieve に渡します。100 以下の素数は全部で 25 個あります。

●より高速な方法

関数 sieve は簡単にプログラムできますが、生成する素数の個数が多くなると、その実行速度はかなり遅くなります。実をいうと、sieve なみに簡単で sieve よりも高速な方法があります。

整数 n が素数か確かめる簡単な方法は、√n 以下の素数で割り切れるか試してみることです。割り切れる素数 m があれば、n は素数ではありません。そうでなければ、n は素数であることがわかります。

これをそのままプログラムすると次のようになります。

リスト : 素数列の生成 [追記 (2020/04/18)]

(defvar *primes* (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

(defun primes-from (n)
  (if (primep n)
      (stream-cons n (primes-from (+ n 2)))
    (primes-from (+ n 2))))

(defun primep (n)
  (every (lambda (p) (/= (mod n p) 0))
         (stream-to-list (stream-take-while (lambda (p) (<= (* p p) n)) *primes*))))

変数 *PRIMES* は無限の素数列を表します。実際に素数を生成する処理は関数 primes-from で行います。primes-from は述語 primep を呼び出して N が素数かチェックします。そうであれば、stream-cons で N を遅延ストリームに追加します。そうでなければ primes-from を再帰呼び出しするだけです。偶数は素数ではないので、引数 n には奇数を与えていることに注意してください。

primep も簡単です。stream-take-while で *PRIMES* から √N 以下の素数列を取り出します。√N 以下の素数は生成済みなので、*PRIMES* から stream-take-while で取り出すことが可能です。ここでは√N のかわりに条件を P * P <= N としています。あとは、関数 every を使って、取り出した素数で n が割り切れないことを確認するだけです。

それでは実行してみましょう。

* (stream-to-list (stream-take *primes* 25))

(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)
* (stream-ref *primes* 99)

541
* (stream-ref *primes* 500)

3581

100 以下の素数は全部で 25 個あります。また、100 番目の素数は 541 になります。Common Lisp のリストは 0 から数えるので、(stream-ref primes 99) で 100 番目の素数になります。

実行時間ですが、stream-ref で 3001 番目の素数を求めてみました。実行環境は Ubunts 18.04 (Windows Subsystem for Linux), Intel Core i5-6200U 2.30GHz, sbcl version 1.4.5 です。

* (time (stream-ref (sieve (stream-unfold #'1+ 2)) 3001))

Evaluation took:
  2.564 seconds of real time
  2.562500 seconds of total run time (1.437500 user, 1.125000 system)
  [ Run times consist of 1.766 seconds GC time, and 0.797 seconds non-GC time. ]
  99.92% CPU
  6,151,226,756 processor cycles
  367,067,216 bytes consed

27479
* (time (stream-ref *primes* 3001))

Evaluation took:
  0.036 seconds of real time
  0.031250 seconds of total run time (0.031250 user, 0.000000 system)
  86.11% CPU
  84,087,369 processor cycles
  37,087,136 bytes consed

27479

sieve よりも primes のほうが高速になりました。興味のある方はいろいろ試してみてください。

-- 追記 (2020/04/18) --------

関数 primp は stream-take-while で新しい遅延ストリームを生成し、さらに stream-to-list でリストに変換しているので、実行時間はとても遅くなります。次のように、遅延ストリームをたどって判定したほうが速くなります。

リスト : 素数の判定

(defun primep (n &optional (ps (stream-cdr *primes*)))
  (let ((p (stream-car ps)))
    (cond
     ((> (* p p) n) t)
     ((zerop (mod n p)) nil)
     (t (primep n (stream-cdr ps))))))

●双子素数

差が 2 である素数の組を「双子素数 (twin prime)」といいます。素数列 *PRIMES* を使うと双子素数は簡単に求めることができます。

* (setq twin (stream-filter (lambda (xs) (= (- (second xs) (first xs)) 2))
 (stream-map (lambda (x y) (list x y)) *primes* (stream-cdr *primes*))))

((3 5) . #S(PROMISE ...)
* (stream-to-list (stream-take twin 100))

((3 5) (5 7) (11 13) (17 19) (29 31) (41 43) (59 61) (71 73) (101 103)
 (107 109) (137 139) (149 151) (179 181) (191 193) (197 199) (227 229)
 (239 241) (269 271) (281 283) (311 313) (347 349) (419 421) (431 433)
 (461 463) (521 523) (569 571) (599 601) (617 619) (641 643) (659 661)
 (809 811) (821 823) (827 829) (857 859) (881 883) (1019 1021) (1031 1033)
 (1049 1051) (1061 1063) (1091 1093) (1151 1153) (1229 1231) (1277 1279)
 (1289 1291) (1301 1303) (1319 1321) (1427 1429) (1451 1453) (1481 1483)
 (1487 1489) (1607 1609) (1619 1621) (1667 1669) (1697 1699) (1721 1723)
 (1787 1789) (1871 1873) (1877 1879) (1931 1933) (1949 1951) (1997 1999)
 (2027 2029) (2081 2083) (2087 2089) (2111 2113) (2129 2131) (2141 2143)
 (2237 2239) (2267 2269) (2309 2311) (2339 2341) (2381 2383) (2549 2551)
 (2591 2593) (2657 2659) (2687 2689) (2711 2713) (2729 2731) (2789 2791)
 (2801 2803) (2969 2971) (2999 3001) (3119 3121) (3167 3169) (3251 3253)
 (3257 3259) (3299 3301) (3329 3331) (3359 3361) (3371 3373) (3389 3391)
 (3461 3463) (3467 3469) (3527 3529) (3539 3541) (3557 3559) (3581 3583)
 (3671 3673) (3767 3769) (3821 3823))

ところで、双子素数 - Wikipedia によると、『双子素数は無数に存在するかという問題、いわゆる「双子素数の予想」や「双子素数の問題」は、いまだに数学上の未解決問題である。無数に存在するだろう、とは、多くの数論学者が予想している。』 とのことです。

●プロミスを使わずに遅延ストリームを実装する

遅延ストリームはプロミスを使わずにクロージャだけで実装することもできます。次のリストを見てください。

リスト : 遅延ストリーム (その2)

;;; 遅延ストリームの生成
(defmacro stream-cons (a b)
  `(cons ,a (lambda () ,b)))

;;; アクセス関数
(defun stream-car (s) (car s))
(defun stream-cdr (s)
  (when (functionp (cdr s))
    (setf (cdr s) (funcall (cdr s))))
  (cdr s))

遅延ストリームをコンスセルで表すのは同じです。stream-cons では、プロミスのかわりにクロージャを CDR 部にセットします。stream-car は今までと同じです。そして、stream-cdr で遅延ストリーム S をたどるとき、(cdr s) が関数であれば funcall で (cdr s) を評価し、その返り値で S の CDR 部を破壊的に修正します。最後に (cdr s) を返します。

たとえば、S が (item1 . #<CLOSURE ...>) だとしましょう。(funcall (cdr s)) の返り値は遅延ストリームになります。それが (item2 . #<CLOSURE ...>) だとすると、これが S の CDR 部にセットされるので、遅延ストリームは次のようになります。

(item1 . #<CLOSURE ...>) == stream-cdr => (item1 item2 . #<CLOSURE ...>)

このように、クロージャの評価結果をリストに格納していくことで、効率的に遅延ストリームを実装することができます。

あとのプログラムは修正しなくても大丈夫です。簡単な実行例を示しましょう。

* (setq s1 (range 1 4))

(1 . #<CLOSURE (LAMBDA () :IN RANGE) {100290D72B}>)
* (stream-car (stream-cdr s1))

2
* s1

(1 2 . #<CLOSURE (LAMBDA () :IN RANGE) {100290EFDB}>)
* (stream-car (stream-cdr (stream-cdr s1)))

3
* s1

(1 2 3 . #<CLOSURE (LAMBDA () :IN RANGE) {10029109CB}>)
* (stream-car (stream-cdr (stream-cdr (stream-cdr s1))))

4
* s1

(1 2 3 4 . #<CLOSURE (LAMBDA () :IN RANGE) {100291264B}>)
* (stream-car (stream-cdr (stream-cdr (stream-cdr (stream-cdr s1)))))

NIL
* s1

(1 2 3 4)
* (stream-to-list (stream-map (lambda (x y) (list x y)) (stream-unfold #'1+ 1) '(a b c d e)))

((1 A) (2 B) (3 C) (4 D) (5 E))
* (setq queens (queen (range 1 8)))

((1 5 8 6 3 7 2 4) . #<CLOSURE (LAMBDA () :IN STREAM-FILTER) {1002CB403B}>)
* (stream-ref queens 20)

(3 6 4 1 8 5 7 2)
* queens

((1 5 8 6 3 7 2 4) (1 6 8 3 7 4 2 5) (1 7 4 6 8 2 5 3) (1 7 5 8 2 4 6 3)
 (2 4 6 8 3 1 7 5) (2 5 7 1 3 8 6 4) (2 5 7 4 1 8 6 3) (2 6 1 7 4 8 3 5)
 (2 6 8 3 1 4 7 5) (2 7 3 6 8 5 1 4) (2 7 5 8 1 4 6 3) (2 8 6 1 3 5 7 4)
 (3 1 7 5 8 2 4 6) (3 5 2 8 1 7 4 6) (3 5 2 8 6 4 7 1) (3 5 7 1 4 2 8 6)
 (3 5 8 4 1 7 2 6) (3 6 2 5 8 1 7 4) (3 6 2 7 1 4 8 5) (3 6 2 7 5 1 8 4)
 (3 6 4 1 8 5 7 2) . #<CLOSURE (LAMBDA () :IN STREAM-FILTER) {10039487EB}>)
* (stream-ref *primes* 50)

233
* *primes*

(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103
 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199
 211 223 227 229 233 . #<CLOSURE (LAMBDA () :IN PRIMES-FROM) {10029380CB}>)

stream-map などの操作関数に普通のリストを渡すと、有限の遅延ストリームとして動作します。興味のある方はいろいろ試してみてください。

●参考文献, URL

  1. "Structure and Interpretation of Computer Programs (SICP)" 3.5 Streams
  2. Gauche ユーザリファレンス: 6.19 遅延評価

●プログラムリスト

;;;
;;; lazys.lisp : 遅延評価と遅延ストリーム
;;;
;;;              Copyright (C) 2008-2020 Makoto Hiroi
;;;

;;;
;;; 遅延評価
;;;

;;; プロミスの定義
(defstruct promise (result nil) thunk)

(defmacro delay (expr)
  `(make-promise :thunk (lambda () ,expr)))

(defun force (ps)
  (when (promise-thunk ps)
    (setf (promise-result ps) (funcall (promise-thunk ps))
          (promise-thunk  ps) nil))
  (promise-result ps))

;;;
;;; 遅延ストリーム
;;;

;;; 遅延ストリームの生成
(defmacro stream-cons (a b)
  `(cons ,a (delay ,b)))

;;; アクセス関数
(defun stream-car (s) (car s))
(defun stream-cdr (s) (force (cdr s)))

;;; 数列の生成
(defun range (low high &optional (step 1))
  (if (> low high)
      nil
    (stream-cons low (range (+ low step) high step))))

;;; フィボナッチ数列
(defun fibonacci (a b)
  (stream-cons a (fibonacci b (+ a b))))

;;; ストリームの生成
(defun stream-unfold (iterate seed &optional (pred (lambda (x) (declare (ignore x)) nil)))
  (if (funcall pred seed)
      nil
    (stream-cons seed (stream-unfold iterate (funcall iterate seed) pred))))

;;; リストを遅延ストリームに変換
(defun list-to-stream (xs)
  (if (null xs)
      nil
    (stream-cons (car xs) (list-to-stream (cdr xs)))))

;;; 有限ストリームをリストに変換
(defun stream-to-list (s)
  (loop until (null s) collect (stream-car s)
        do (setq s (stream-cdr s))))

;;; n 番目の要素を求める
(defun stream-ref (s n)
  (if (zerop n)
      (stream-car s)
    (stream-ref (stream-cdr s) (1- n))))

;;; 先頭から n 個の要素を取り出す
(defun stream-take (s n)
  (if (or (null s) (zerop n))
      nil
    (stream-cons (stream-car s) (stream-take (stream-cdr s) (1- n)))))

;;; 先頭から n 個の要素を取り除く
(defun stream-drop (s n)
  (if (or (null s) (zerop n))
      s
    (stream-drop (stream-cdr s) (1- n))))

;;; ストリームの結合
(defun stream-append (s1 s2)
  (if (null s1)
      s2
    (stream-cons (stream-car s1)
                 (stream-append (stream-cdr s1) s2))))

(defun interleave (s1 s2)
  (if (null s1)
      s2
    (stream-cons (stream-car s1)
                 (interleave s2 (stream-cdr s1)))))

;;; 遅延評価版
(defun stream-append-delay (s1 s2)
  (if (null s1)
      (force s2)
    (stream-cons (stream-car s1)
                 (stream-append-delay (stream-cdr s1) s2))))

(defun interleave-delay (s1 s2)
  (if (null s1)
      (force s2)
    (stream-cons (stream-car s1)
                 (interleave-delay (force s2) (cdr s1)))))

;;;
;;; 高階関数
;;;

;;; マップ関数
(defun stream-map (proc &rest s)
  (if (member nil s)
      nil
    (stream-cons (apply proc (mapcar #'stream-car s))
                 (apply #'stream-map proc (mapcar #'stream-cdr s)))))

;;; マッピングの結果を平坦化する
(defun stream-flatmap (proc s)
  (if (null s)
      nil
    (stream-append-delay (funcall proc (stream-car s))
                         (delay (stream-flatmap proc (stream-cdr s))))))

;;; フィルター
(defun stream-filter (pred s)
  (cond
   ((null s) nil)
   ((funcall pred (stream-car s))
    (stream-cons (stream-car s)
                 (stream-filter pred (stream-cdr s))))
   (t (stream-filter pred (stream-cdr s)))))

;;; 畳み込み
(defun stream-fold-left (proc a s)
  (if (null s)
      a
    (stream-fold-left proc (funcall proc a (stream-car s)) (stream-cdr s))))

(defun stream-fold-right (proc a s)
  (if (null s)
      a
    (funcall proc (stream-car s) (stream-fold-right proc a (stream-cdr s)))))

;;; 累積値を格納した遅延ストリームを返す
(defun stream-scan-left (fn a xs)
  (stream-cons a (if (null xs)
                     nil
                   (stream-scan-left fn (funcall fn (stream-car xs) a) (stream-cdr xs)))))

;;; 巡回
(defun stream-for-each (proc s)
  (unless (null s)
    (funcall proc (stream-car s))
    (stream-for-each proc (stream-cdr s))))

;;; 述語 pred が真を返す要素を取り出す
(defun stream-take-while (pred s)
  (if (not (funcall pred (stream-car s)))
      nil
    (stream-cons (stream-car s)
                 (stream-take-while pred (stream-cdr s)))))

;;; 述語 pred が真を返す要素を取り除く
(defun stream-drop-while (pred s)
  (if (not (funcall pred (stream-car s)))
      s
    (stream-drop-while pred (stream-cdr s))))

;;; 組 (pair) の生成
(defun pair-stream (s1 s2)
  (stream-flatmap
   (lambda (x)
     (stream-map (lambda (y) (list x y)) s2))
   s1))

;;; 無限ストリームに対応
(defun pair-stream1 (s1 s2 &optional (n 1))
  (let ((ys (list-to-stream (nreverse (stream-to-list (stream-take s2 n))))))
    (stream-append-delay
     (stream-map (lambda (x y) (list x y)) (stream-take s1 n) ys)
     (delay (pair-stream1 s1 s2 (1+ n))))))

;;; 遅延ストリームの併合
(defun stream-merge (s1 s2)
  (cond
   ((null s1) s2)
   ((null s2) s1)
   ((<= (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1) (stream-merge (stream-cdr s1) s2)))
   (t
    (stream-cons (stream-car s2) (stream-merge s1 (stream-cdr s2))))))

;;;
;;; 集合演算
;;;

;;; 和集合
(defun stream-union (s1 s2)
  (cond
   ((null s1) s2)
   ((null s2) s1)
   ((= (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1)
                 (stream-union (stream-cdr s1) (stream-cdr s2))))
   ((< (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1)
                 (stream-union (stream-cdr s1) s2)))
   (t
    (stream-cons (stream-car s2)
                 (stream-union s1 (stream-cdr s2))))))

;;; 積集合
(defun stream-intersect (s1 s2)
  (cond
   ((or (null s1) (null s2)) nil)
   ((= (stream-car s1) (stream-car s2))
    (stream-cons (stream-car s1)
                 (stream-intersect (stream-cdr s1) (stream-cdr s2))))
   ((< (stream-car s1) (stream-car s2))
    (stream-intersect (stream-cdr s1) s2))
   (t
    (stream-intersect s1 (stream-cdr s2)))))

;;; ハミングの問題
(defvar *hs*)
(setq *hs*
      (stream-cons
       1
       (stream-union
        (stream-map (lambda (x) (* x 2)) *hs*)
        (stream-union (stream-map (lambda (x) (* x 3)) *hs*)
                      (stream-map (lambda (x) (* x 5)) *hs*)))))

;;; 素数の生成
(defun sieve (s)
  (stream-cons (stream-car s)
               (sieve (stream-filter (lambda (x) (/= (mod x (stream-car s)) 0))
                                     (stream-cdr s)))))

;;; 別解
(declaim (ftype (function (integer) t) primes-from))
(defvar *primes* (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

(defun primep (n)
  (every (lambda (p) (/= (mod n p) 0))
         (stream-to-list (stream-take-while (lambda (p) (<= (* p p) n)) *primes*))))

(defun primes-from (n)
  (if (primep n)
      (stream-cons n (primes-from (+ n 2)))
    (primes-from (+ n 2))))

;;; 順列の生成
(defun make-perm (n s)
  (if (zerop n)
      (stream-cons nil nil)
    (stream-flatmap
     (lambda (x)
       (stream-map (lambda (y) (cons x y))
                   (make-perm (1- n)
                              (stream-filter (lambda (z) (not (eql x z))) s))))
     s)))

;;; 8 Queen の解法
(defun attack (q xs &optional (n 1))
  (cond
   ((null xs) nil)
   ((or (= (+ q n) (car xs)) (= (- q n) (car xs))) t)
   (t (attack q (cdr xs) (1+ n)))))

(defun queen (s)
  (if (null s)
      (stream-cons nil nil)
    (stream-filter
     (lambda (ls)
       (if (null ls)
           t
         (not (attack (car ls) (cdr ls)))))
     (stream-flatmap
      (lambda (x)
        (stream-map (lambda (y) (cons x y))
                    (queen (stream-filter (lambda (z) (not (eql x z))) s))))
      s))))

;;; 木の巡回 (リストを木としてみる)
(defun stream-of-tree (ls cont)
  (cond
   ((null ls) (funcall cont))
   ((atom ls)
    (stream-cons ls (funcall cont)))
   (t
    (stream-of-tree
     (car ls)
     (lambda ()
       (stream-of-tree
        (cdr ls)
        (lambda () (funcall cont))))))))

;;; ツリーマッチング
(defun same-fringe-p (tree1 tree2 &key (test #'eql))
  (labels ((iter (s1 s2)
             (cond
              ((and (null s1) (null s2)) t)
              ((or (null s1) (null s2)) nil)
              ((funcall test (stream-car s1) (stream-car s2))
               (iter (stream-cdr s1) (stream-cdr s2)))
              (t nil))))
    (iter (stream-of-tree tree1 (lambda () nil))
          (stream-of-tree tree2 (lambda () nil)))))

初版 2008 年 11 月 9 日
改訂 2017 年 2 月 19 日
改訂 2020 年 4 月 11 日

Copyright (C) 2008-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | Common Lisp | NextPage ]