M.Hiroi's Home Page

お気楽 Scheme プログラミング入門

応用編 : 遅延ストリーム (3)

Copyright (C) 2017-2020 Makoto Hiroi
All rights reserved.

はじめに

前回まで作成した遅延ストリームは、stream-cons で遅延ストリームを生成するとき、ストリームの要素となる引数を評価していました。たとえば、(stream-cons (func x) ...) とすると、(func x) を評価した値がストリームの要素となります。ここで、ストリームにまだアクセスしていないのに、(func x) が評価されていることに注意してください。もし、(func x) がデータの入力処理だとすると、遅延ストリームを生成するときにデータをひとつ先読みしてしまうことになります。

そこで、コンスセルの CAR 部と CDR 部をまとめて遅延評価することにします。この場合、プロミスが遅延ストリームを表すことになります。今回はこの方法で遅延ストリームを作ってみましょう。

●遅延ストリームをプロミスで表す

遅延ストリームをプロミスで表す場合、その構造はつぎのようになります。

リスト : 遅延ストリーム

(define-library (mylib lazystream1)
  (import (scheme base) (scheme lazy) (scheme case-lambda))
  (export ・・・省略・・・ )
  (begin
    ;; 遅延ストリームの基本関数
    (define-syntax stream-cons
      (syntax-rules ()
        ((_ a b) (delay (cons a b)))))

    ;; 先頭要素を取り出す
    (define (stream-car s) (car (force s)))

    ;; 先頭要素を取り除く
    (define (stream-cdr s) (cdr (force s)))

    ;; ストリームの終端
    (define nil (delay '()))
    (define (stream-empty? s) (null? (force s)))
    (define (stream-empty) nil)

    ・・・省略・・・

  ))

stream-cons は (cons a (delay b)) ではなく (delay (cons a b)) とします。これで stream-cons の引数 a, b が遅延評価されます。stream-car と stream-cdr は遅延ストリーム s を force で評価してから car と cdr を適用します。遅延ストリームはプロミスで表すので、終端 nil の定義は (delay '()) とします。stream-empty? も遅延ストリーム s を force で評価してから null? でチェックします。

●stream-delay

ここで stream-empty? を評価すると、遅延ストリームが force されることに注意してください。たとえば、遅延ストリームを連結する stream-append を次のように定義すると問題が発生します。

リスト : 遅延ストリームの連結 (間違い版)

;; 整数列の生成
(define (stream-range low high)
  (if (> low high)
      nil
      (stream-cons low (stream-range (+ 1 low) high))))

;; 遅延ストリームの連結 (間違い版)
(define (stream-append-bad s1 s2)
  (if (stream-empty? s1)
      s2
      (stream-cons (stream-car s1)
                   (stream-append-bad (stream-cdr s1) s2))))

stream-append-bad でストリームを生成するとき、empty? で s1 が force されることになります。つまり、新しいストリームを生成する前に引数のストリームが評価されてしまうのです。次の例を見てください。

gosh[r7rs.user]> (define s1 (range 1 4))
s1
gosh[r7rs.user]> (define s2 (range 5 8))
s2
gosh[r7rs.user]> (define s3 (stream-append-bad s1 s2))
s3
gosh[r7rs.user]> s1
#<promise ... (forced)>
gosh[r7rs.user]> s2
#<promise ...>
gosh[r7rs.user]> s3
#<promise ...>

s1 と s2 を連結した新しいストリーム s3 を評価していないにもかかわらず、引数のストリーム s1 が force されていることがわかります。この場合、stream-append の本体を delay と force で囲みます。

リスト : 遅延ストリームの連結 (修正版)

(define (stream-append s1 s2)
  (delay
   (force
    (if (stream-empty? s1)
        s2
        (stream-cons (stream-car s1)
                     (stream-append (stream-cdr s1) s2))))))

delay と force で囲むのは無駄なように思いますが、これにより stream-append を評価して遅延ストリームを生成するとき、引数 s1 の遅延ストリームが force されずにすむわけです。

実際には、次に示すようなマクロを定義すると簡単です。

リスト : 式 expr の遅延ストリームを返す

(define-syntax stream-delay
  (syntax-rules ()
    ((_ expr) (delay (force expr)))))
リスト : 遅延ストリームの連結 (完成版)

(define (stream-append s1 s2)
  (stream-delay
    (if (stream-empty? s1)
        s2
        (stream-cons (stream-car s1)
                     (stream-append (stream-cdr s1) s2)))))

簡単な実行例を示します。

gosh[r7rs.user]> (define s4 (stream-range 1 4))
s4
gosh[r7rs.user]> (define s5 (stream-range 5 8))
s5
gosh[r7rs.user]> (define s6 (stream-append s4 s5))
s6
gosh[r7rs.user]> s4
#<promise ...>
gosh[r7rs.user]> s5
#<promise ...>
gosh[r7rs.user]> s6
#<promise ...>
gosh[r7rs.user]> (stream-car s6)
1
gosh[r7rs.user]> (stream-car (stream-cdr s6))
2
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr s6)))
3
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr s6))))
4
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr (stream-cdr s6)))))
5

このように、stream-delay を使うことで、新しい遅延ストリームを生成するとき、引数のストリームが force されるのを防止することができます。

同様に stream-map や stream-filter など、遅延ストリームを受け取って新しい遅延ストリームを返す関数は stream-delay で囲む必要があります。詳細はプログラムリストをお読みください。

●実行速度の比較

それでは簡単な実行例として、素数を求めるプログラムで実行速度を比較してみましょう。

リスト : 素数を求める

(define *primes*
  (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

(define (primes-from n)
  (if (prime? n)
      (stream-cons n (primes-from (+ n 2)))
      (primes-from (+ n 2))))

(define (prime? n)
  (let loop ((s (stream-cdr *primes*)))
    (let ((p (stream-car s)))
      (cond
       ((> (* p p) n) #t)
       ((zero? (modulo n p)) #f)
       (else
        (loop (stream-cdr s)))))))
リスト : 簡単なテスト

(define (test ps n)
  (let ((s (current-jiffy)))
    (display (stream-ref ps n))
    (newline)
    (inexact (/ (- (current-jiffy) s) (jiffies-per-second)))))

素数列 primes の定義は「遅延ストリーム (1)」で作成したものと同じです。(stream-ref primes 20000) の実行時間を求めたところ、結果は次のようになりました。

lazystream.scm  : 0.29 秒
lazystream1.scm : 0.40 秒

実行環境 : Ubunts 18.04 (Windows Subsystem for Linux), Intel Core i5-6200U 2.30GHz, Gauche ver 0.9.9

今回の遅延ストリームのほうが少し遅くなりました。興味のある方はいろいろ試してみてください。

●問題点

今回は単純に delay と force を使いましたが、この方法では末尾再帰的なアルゴリズムとの相性がよくないことがわかっているそうです。詳しい説明は Gauche のリファレンス「遅延評価」や Yutaka Hara さんの「R7RSのdelay-forceとは何か」をお読みください。


●プログラムリスト

;;;
;;; lazystream1.scm : 遅延ストリーム (CAR 部も遅延評価する)
;;;
;;;                   Copyright (C) 2017-2020 Makoto Hiroi
;;;
(define-library (mylib lazystream1)
  (import (scheme base) (scheme lazy) (scheme case-lambda))
  (export stream-cons stream-car stream-cdr stream-empty? stream-empty stream-delay
          stream-range stream-unfold list->stream stream->list stream-ref
          stream-take stream-drop stream-append stream-append-delay interleave
          stream-map stream-flatmap stream-filter stream-foldl stream-foldr
          stream-scanl stream-for-each stream-take-while stream-drop-while
          stream-merge stream-union stream-intersect stream-perm
          sieve *primes*
   )
  (begin
    ;; 遅延ストリームの基本関数
    (define-syntax stream-cons
      (syntax-rules ()
        ((_ a b) (delay (cons a b)))))

    ;; 先頭要素を取り出す
    (define (stream-car s) (car (force s)))

    ;; 先頭要素を取り除く
    (define (stream-cdr s) (cdr (force s)))

    ;; ストリームの終端
    (define nil (delay '()))
    (define (stream-empty? s) (null? (force s)))
    (define (stream-empty) nil)

    ;; 式 expr の遅延ストリームを返す
    (define-syntax stream-delay
      (syntax-rules ()
        ((_ expr) (delay (force expr)))))

    ;;
    ;; ストリームの生成
    ;;

    ;; 数列を生成するストリーム
    (define (stream-range low high)
      (if (> low high)
          nil
          (stream-cons low (stream-range (+ 1 low) high))))

    ;; 逆畳み込み
    (define stream-unfold
      (case-lambda
       ((iterate seed)
        (stream-unfold iterate seed (lambda (x) #f)))
       ((iterate seed pred)
        (if (pred seed)
            nil
            (stream-cons seed
                         (stream-unfold iterate (iterate seed) pred))))))

    ;; リストとストリームの変換
    (define (list->stream xs)
      (if (null? xs)
          nil
          (stream-cons (car xs) (list->stream (cdr xs)))))

    (define (stream->list s)
      (if (stream-empty? s)
          '()
          (cons (stream-car s) (stream->list (stream-cdr s)))))

    ;; 基本的な操作関数
    (define (stream-ref s n)
      (if (zero? n)
          (stream-car s)
          (stream-ref (stream-cdr s) (- n 1))))

    (define (stream-take s n)
      (stream-delay
       (if (or (stream-empty? s) (zero? n))
           nil
           (stream-cons (stream-car s)
                        (stream-take (stream-cdr s) (- n 1))))))

    (define (stream-drop s n)
      (stream-delay
       (if (or (stream-empty? s) (zero? n))
           s
           (stream-drop (stream-cdr s) (- n 1)))))

    ;; 遅延ストリームの連結
    (define (stream-append s1 s2)
      (stream-delay
       (if (stream-empty? s1)
           s2
           (stream-cons (stream-car s1)
                        (stream-append (stream-cdr s1) s2)))))

    ;; 遅延評価版
    (define (stream-append-delay s1 s2)
      (stream-delay
       (if (stream-empty? s1)
           (force s2)
           (stream-cons (stream-car s1)
                        (stream-append-delay (stream-cdr s1) s2)))))

    ;; ストリームの要素を交互に出力
    (define (interleave s1 s2)
      (stream-delay
       (if (stream-empty? s1)
           s2
           (stream-cons (stream-car s1)
                        (interleave s2 (stream-cdr s1))))))

    ;;
    ;; 高階関数
    ;;

    ;; 簡易版
    (define (any pred xs)
      (if (null? xs)
          #f
          (or (pred (car xs)) (any pred (cdr xs)))))

    ;; マッピング
    (define (map-1 proc s)
      (stream-delay
       (if (stream-empty? s)
           nil
           (stream-cons (proc (stream-car s))
                        (map-1 proc (stream-cdr s))))))

    (define (map-n proc ss)
      (stream-delay
       (if (any stream-empty? ss)
           nil
           (stream-cons (apply proc (map stream-car ss))
                        (map-n proc (map stream-cdr ss))))))

    (define stream-map
      (case-lambda
       ((proc s)
        (map-1 proc s))
       ((proc s . args)
        (map-n proc (cons s args)))))

    ;; マッピングの結果を平坦化する
    (define (stream-flatmap proc s)
      (stream-delay
       (if (stream-empty? s)
           nil
           (stream-append-delay (proc (stream-car s))
                                (delay (stream-flatmap proc (stream-cdr s)))))))

    ;; フィルター
    (define (stream-filter pred s)
      (stream-delay
       (cond
        ((stream-empty? s) nil)
        ((pred (stream-car s))
         (stream-cons (stream-car s)
                      (stream-filter pred (stream-cdr s))))
        (else
         (stream-filter pred (stream-cdr s))))))

    ;; 畳み込み
    (define (foldl-1 proc a s)
      (if (stream-empty? s)
          a
          (foldl-1 proc (proc a (stream-car s)) (stream-cdr s))))

    (define (foldl-n proc a ss)
      (if (any stream-empty? ss)
          a
          (foldl-n proc
                   (apply proc a (map stream-car ss))
                   (map stream-cdr ss))))

    (define stream-foldl
      (case-lambda
       ((proc a s)
        (foldl-1 proc a s))
       ((proc a s . args)
        (foldl-n proc a (cons s args)))))

    (define (foldr-1 proc a s)
      (if (stream-empty? s)
          a
          (proc (foldr-1 proc a (stream-cdr s)) (stream-car s))))

    (define (foldr-n proc a ss)
      (if (any stream-empty? ss)
          a
          (apply proc
                 (foldr-n proc a (map stream-cdr ss))
                 (map stream-car ss))))

    (define stream-foldr
      (case-lambda
       ((proc a s)
        (foldr-1 proc a s))
       ((proc a s . args)
        (foldr-n proc a (cons s args)))))

    (define (scanl-1 proc a s)
      (if (stream-empty? s)
          (stream-cons a nil)
          (stream-cons a (scanl-1 proc (proc a (stream-car s)) (stream-cdr s)))))

    (define (scanl-n proc a ss)
      (if (any stream-empty? ss)
          (stream-cons a nil)
          (stream-cons a (scanl-n proc
                                  (apply proc a (map stream-car ss))
                                  (map stream-cdr ss)))))

    (define stream-scanl
      (case-lambda
       ((proc a s)
        (scanl-1 proc a s))
       ((proc a s . args)
        (scanl-n proc a (cons s args)))))

    ;; 巡回
    (define (for-each-1 proc s)
      (unless
       (stream-empty? s)
       (proc (stream-car s))
       (for-each-1 proc (stream-cdr s))))

    (define (for-each-n proc ss)
      (unless
       (any stream-empty? ss)
       (apply proc (map stream-car ss))
       (for-each-n proc (map stream-cdr ss))))

    (define stream-for-each
      (case-lambda
       ((proc s)
        (for-each-1 proc s))
       ((proc s . args)
        (for-each-n proc (cons s args)))))

    (define (stream-take-while pred s)
      (if (or (stream-empty? s) (not (pred (stream-car s))))
          nil
          (stream-cons (stream-car s)
                       (stream-take-while pred (stream-cdr s)))))

    (define (stream-drop-while pred s)
      (if (or (stream-empty? s) (not (pred (stream-car s))))
          s
          (stream-drop-while pred (stream-cdr s))))

    ;; 遅延ストリームの併合
    (define (stream-merge s1 s2)
      (stream-delay
       (cond
        ((stream-empty? s1) s2)
        ((stream-empty? s2) s1)
        ((<= (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s1) (stream-merge (stream-cdr s1) s2)))
        (else
         (stream-cons (stream-car s2) (stream-merge s1 (stream-cdr s2)))))))

    ;; 和集合
    (define (stream-union s1 s2)
      (stream-delay
       (cond
        ((stream-empty? s1) s2)
        ((stream-empty? s2) s1)
        ((< (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s1)
                      (stream-union (stream-cdr s1) s2)))
        ((> (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s2)
                      (stream-union s1 (stream-cdr s2))))
        (else
         (stream-cons (stream-car s1)
                      (stream-union (stream-cdr s1) (stream-cdr s2)))))))

    ;; 積集合
    (define (stream-intersect s1 s2)
      (stream-delay
       (cond
        ((or (stream-empty? s1) (stream-empty? s2)) nil)
        ((= (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s1)
                      (stream-intersect (stream-cdr s1) (stream-cdr s2))))
        ((< (stream-car s1) (stream-car s2))
         (stream-intersect (stream-cdr s1) s2))
        (else
         (stream-intersect s1 (stream-cdr s2))))))

    ;; 順列の生成
    (define (stream-perm n s)
      (stream-delay
       (if (zero? n)
           (stream-cons '() nil)
           (stream-flatmap
            (lambda (x)
              (stream-map (lambda (y) (cons x y))
                          (stream-perm
                           (- n 1)
                           (stream-filter (lambda (z) (not (eqv? x z))) s))))
            s))))

    ;;
    ;; 素数
    ;;
    (define (sieve s)
      (stream-cons (stream-car s)
                   (sieve (stream-filter
                           (lambda (x) (not (zero? (modulo x (stream-car s)))))
                           (stream-cdr s)))))

    ;; 高速化
    (define *primes*
      (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

    (define (primes-from n)
      (if (prime? n)
          (stream-cons n (primes-from (+ n 2)))
          (primes-from (+ n 2))))

    (define (prime? n)
      (let loop ((s (stream-cdr *primes*)))
        (let ((p (stream-car s)))
          (cond
           ((> (* p p) n) #t)
           ((zero? (modulo n p)) #f)
           (else
            (loop (stream-cdr s)))))))

    ))

●簡単な実行例

gosh[r7rs.user]> (import (mylib lazystream1))
gosh[r7rs.user]> (define s1 (stream-unfold (lambda (x) (+ x 1)) 1))
s1
gosh[r7rs.user]> (stream->list (stream-take s1 20))
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20)
gosh[r7rs.user]> (stream->list (stream-take (stream-map 
(lambda (x) (* x x)) s1) 20))
(1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400)
gosh[r7rs.user]> (stream->list (stream-take (stream-filter even? s1) 20))
(2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40)
gosh[r7rs.user]> (stream->list (stream-take (stream-filter odd? s1) 20))
(1 3 5 7 9 11 13 15 17 19 21 23 25 27 29 31 33 35 37 39)

gosh[r7rs.user]> (define (stream-add s1 s2) (stream-map + s1 s2))
stream-add
gosh[r7rs.user]> (define fibo (stream-cons 0 (stream-cons 1 (stream-add 
(stream-cdr fibo) fibo))))
fibo
gosh[r7rs.user]> (stream->list (stream-take fibo 30))
(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765 10946
 17711 28657 46368 75025 121393 196418 317811 514229)

gosh[r7rs.user]> (define hs (stream-cons 1
 (stream-union (stream-map (lambda (x) (* x 2)) hs)
 (stream-union (stream-map (lambda (x) (* x 3)) hs)
 (stream-map (lambda (x) (* x 5)) hs)))))
hs
gosh[r7rs.user]> (stream-for-each (lambda (x) (display x) (display " ")) 
(stream-take hs 100))
1 2 3 4 5 6 8 9 10 12 15 16 18 20 24 25 27 30 32 36 40 45 48 50 54 60 64 72 75 80 
81 90 96 100 108 120 125 128 135 144 150 160 162 180 192 200 216 225 240 243 250 
256 270 288 300 320 324 360 375 384 400 405 432 450 480 486 500 512 540 576 600 
625 640 648 675 720 729 750 768 800 810 864 900 960 972 1000 1024 1080 1125 1152 
1200 1215 1250 1280 1296 1350 1440 1458 1500 1536 #<undef>

gosh[r7rs.user]> (stream->list (stream-perm 4 (stream-range 1 4)))
((1 2 3 4) (1 2 4 3) (1 3 2 4) (1 3 4 2) (1 4 2 3) (1 4 3 2) (2 1 3 4)
 (2 1 4 3) (2 3 1 4) (2 3 4 1) (2 4 1 3) (2 4 3 1) (3 1 2 4) (3 1 4 2)
 (3 2 1 4) (3 2 4 1) (3 4 1 2) (3 4 2 1) (4 1 2 3) (4 1 3 2) (4 2 1 3)
 (4 2 3 1) (4 3 1 2) (4 3 2 1))
gosh[r7rs.user]> (stream->list (stream-take (stream-perm 8 (stream-range 1 8)) 
10))
((1 2 3 4 5 6 7 8) (1 2 3 4 5 6 8 7) (1 2 3 4 5 7 6 8) (1 2 3 4 5 7 8 6)
 (1 2 3 4 5 8 6 7) (1 2 3 4 5 8 7 6) (1 2 3 4 6 5 7 8) (1 2 3 4 6 5 8 7)
 (1 2 3 4 6 7 5 8) (1 2 3 4 6 7 8 5))

gosh[r7rs.user]> (define (stream-zip s1 s2) (stream-map list s1 s2))
stream-zip
gosh[r7rs.user]> (stream->list (stream-take (stream-zip s1 s1) 10))
((1 1) (2 2) (3 3) (4 4) (5 5) (6 6) (7 7) (8 8) (9 9) (10 10))
gosh[r7rs.user]> (define *twins* (stream-filter (lambda (xs) (= (- (cadr xs) 
(car xs)) 2)) (stream-zip *primes* (stream-cdr *primes*))))
*twins*
gosh[r7rs.user]> (stream->list (stream-take *twins* 10))
((3 5) (5 7) (11 13) (17 19) (29 31) (41 43) (59 61) (71 73) (101 103)
 (107 109))
gosh[r7rs.user]> (stream-for-each display (stream-take *twins* 100))
(3 5)(5 7)(11 13)(17 19)(29 31)(41 43)(59 61)(71 73)(101 103)(107 109)(137 139)
(149 151)(179 181)(191 193)(197 199)(227 229)(239 241)(269 271)(281 283)(311 313)
(347 349)(419 421)(431 433)(461 463)(521 523)(569 571)(599 601)(617 619)(641 643)
(659 661)(809 811)(821 823)(827 829)(857 859)(881 883)(1019 1021)(1031 1033)
(1049 1051)(1061 1063)(1091 1093)(1151 1153)(1229 1231)(1277 1279)(1289 1291)
(1301 1303)(1319 1321)(1427 1429)(1451 1453)(1481 1483)(1487 1489)(1607 1609)
(1619 1621)(1667 1669)(1697 1699)(1721 1723)(1787 1789)(1871 1873)(1877 1879)
(1931 1933)(1949 1951)(1997 1999)(2027 2029)(2081 2083)(2087 2089)(2111 2113)
(2129 2131)(2141 2143)(2237 2239)(2267 2269)(2309 2311)(2339 2341)(2381 2383)
(2549 2551)(2591 2593)(2657 2659)(2687 2689)(2711 2713)(2729 2731)(2789 2791)
(2801 2803)(2969 2971)(2999 3001)(3119 3121)(3167 3169)(3251 3253)(3257 3259)
(3299 3301)(3329 3331)(3359 3361)(3371 3373)(3389 3391)(3461 3463)(3467 3469)
(3527 3529)(3539 3541)(3557 3559)(3581 3583)(3671 3673)(3767 3769)(3821 3823)
#<undef>

初版 2017 年 2 月 17 日
改訂 2020 年 10 月 11 日