M.Hiroi's Home Page

Functional Programming

お気楽 Scheme プログラミング入門

[ PrevPage | Scheme | NextPage ]

遅延ストリーム (1)

「ストリーム (stream)」はデータの流れを抽象化したデータ構造です。たとえば、ファイル入出力はストリームと考えることができます。また、リストを使ってストリームを表すこともできます。ただし、単純なリストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができるようになります。これを「遅延ストリーム」とか「遅延リスト」と呼びます。今回は遅延ストリームについて説明します。

なお、Scheme には遅延ストリームを扱うライブラリ (SRFI-40, 41, R7RS-large の (scheme stream)) や、遅延シーケンスを扱うライブラリ (SRFI-127, R7RS-large の (scheme lseq)) があります。どちらのライブラリも Gauche でサポートされているので、私たちが遅延ストリームを作る必要はまったくありませんが、Scheme のお勉強ということで、あえてプログラムを作ってみましょう。

●遅延ストリームの構造

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して値を求めればよいわけです。

今回は遅延ストリームをコンスセルで表すことにします。コンスセルの CAR が現時点での先頭データを表し、CDR が遅延ストリームを生成する関数を格納するプロミスです。次のリストを見てください。

リスト : 遅延ストリーム

(define-library (mylib lazystream)
  (import (scheme base) (scheme lazy) (scheme case-lambda))
  (export ・・・省略・・・)
  (begin
    ;; 遅延ストリームの生成
    (define-syntax stream-cons
      (syntax-rules ()
        ((_ a b) (cons a (delay b)))))

    ;; 先頭要素を参照
    (define (stream-car s) (car s))

    ;; 先頭要素を取り除く
    (define (stream-cdr s) (force (cdr s)))

    ;; ストリームの終端
    (define nil '())
    (define stream-empty? null?)
    (define (stream-empty) nil)

    ・・・省略・・・

  ))

マクロ stream-cons はコンスセルの CAR にストリームの要素 a を格納し、CDR にプロミスを格納します。プロミスにはストリームを生成する関数 b を格納します。プロミスを force することで、次の要素を格納した遅延ストリームを生成します。ストリームの終端は nil で表すことにします。nil は空リスト () で表し、遅延ストリームの終端を判定する述語 stream-empty? を用意します。stream-empty? は null? と同じです。

関数 stream-car は遅延ストリーム s から要素を取り出して返します。関数 stream-cdr は s のプロミスを force して、次の要素を格納した遅延ストリームを生成して返します。ようするに、これらのマクロと関数はリスト操作の cons, car, cdr に対応しているわけです。

●遅延ストリームの生成

それでは、遅延ストリームを生成する関数を作りましょう。たとえば、low から high までの整数列を生成するストリームは次のようにプログラムすることができます。

リスト : 整数列を生成するストリーム

(define (stream-range low high)
  (if (> low high)
      nil
      (stream-cons low (stream-range (+ 1 low) high))))

関数 stream-range は遅延ストリームを生成して返します。stream-cons の第 1 引数がストリームの初期値になります。そして、プロミスを force すると、(stream-range (+ 1 low) high) が評価されて次のデータを格納した遅延ストリームが返されます。そして、その中のプロミスを force すると、その次のデータを得ることができます。

簡単な例を示しましょう。

gosh[r7rs.user]> (import (mylib lazystream))
gosh[r7rs.user]> (define a (stream-range 1 10))
a
gosh[r7rs.user]> (stream-car a)
1
gosh[r7rs.user]> (stream-car (stream-cdr a))
2
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr a)))
3
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr a))))
4

このように、CDR 部のプロミスを force することで、次々とデータを生成することができます。

もう一つ、簡単な例を示しましょう。フィボナッチ数列を生成する遅延ストリームを作ります。次のリストを見てください。

リスト : フィボナッチ数列を生成する遅延ストリーム

(define (fibonacci a b)
  (stream-cons a (fibonacci b (+ a b))))

関数 fibonacci の引数 a がフィボナッチ数列の初項で、b が次項です。したがって、プロミスに (fibonacci b (+ a b)) を格納しておけば、force することでフィボナッチ数列を生成することができます。Scheme は多倍長整数をサポートしているので、メモリの許す限りフィボナッチ数列を生成することができます。

gosh[r7rs.user]> (define (fibonacci a b) (stream-cons a (fibonacci b (+ a b))))
fibonacci
gosh[r7rs.user]> (define b (fibonacci 0 1))
b
gosh[r7rs.user]> (stream-car b)
0
gosh[r7rs.user]> (stream-car (stream-cdr b))
1
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr b)))
1
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr b))))
2
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr (stream-cdr b)))))
3

これらの関数の動作を一般化すると、次のような関数を定義することができます。

リスト : 逆畳み込み

(define stream-unfold
  (case-lambda
   ((iterate seed)
    (stream-unfold itreate seed (lambda (x) #f)))
   ((iterate seed pred)
    (if (pred seed)
        nil
        (stream-cons seed
                     (stream-unfold iterate (iterate seed) pred))))))

関数 stream-unfold は引数 seed に初項を受け取り、次項を関数 iterate で生成します。stream-cons の第 1 引数に seed を渡して、第 2 引数で stream-unfold を呼び出すとき、(iterate seed) の返り値を渡します。引数 pred は遅延ストリームの終端を判定する述語です。(pred seed) が真を返す場合、stream-unfold は nil を返します。常に偽を返す関数を pred に渡すと無限ストリームを生成することができます。pred を省略した場合は (lambda (x) #f) を渡すことにします。

簡単な実行例を示します。

gosh[r7rs.user]> (define c (stream-unfold (lambda (x) (+ x 1)) 1))
c
gosh[r7rs.user]> (stream-car c)
1
gosh[r7rs.user]> (stream-car (stream-cdr c))
2
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr c)))
3

gosh[r7rs.user]> (define d (stream-unfold (lambda (xs) (list (cadr xs) (apply + xs))) '(0 1)))
d
gosh[r7rs.user]> (stream-car d)
(0 1)
gosh[r7rs.user]> (stream-car (stream-cdr d))
(1 1)
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr d)))
(1 2)
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr d))))
(2 3)
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr (stream-cdr d)))))
(3 5)

●リストと遅延ストリームの変換

リストと遅延ストリームを変換することも簡単にできます。次のリストを見てください。

リスト : リストと遅延ストリームの変換

(define (list->stream xs)
  (if (null? xs)
      nil
      (stream-cons (car xs) (list->stream (cdr xs)))))

(define (stream->list s)
  (if (stream-empty? s)
      '()
      (cons (stream-car s) (stream->list (stream-cdr s)))))

関数 list->stream はリスト xs の先頭から要素を順番に取り出して、遅延ストリームに格納して返すだけです。関数 stream->list は有限ストリーム s をリストに変換します。引数 s が空のストリームであれば空リストを返します。あとは、stream->list を再帰呼び出しして、その返り値に遅延ストリームの先頭要素 (stream-car s) を cons で追加していくだけです。

簡単な実行例を示します。

gosh[r7rs.user]> (define e (list->stream '(1 2 3 4 5)))
e
gosh[r7rs.user]> (stream-car e)
1
gosh[r7rs.user]> (stream-car (stream-cdr e))
2
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr e)))
3
gosh[r7rs.user]> (stream->list e)
(1 2 3 4 5)
gosh[r7rs.user]> (define f (list->stream '(a b c d e)))
f
gosh[r7rs.user]> (stream->list f)
(a b c d e)

●遅延ストリームの操作関数

次は遅延ストリームを操作する関数を作りましょう。最初は n 番目の要素を求める関数 stream-ref です。本稿では Scheme のリストに合わせて先頭の要素を 0 番目とします。

リスト : n 番目の要素を求める

(define (stream-ref s n)
  (if (zero? n)
      (stream-car s)
      (stream-ref (stream-cdr s) (- n 1))))

stream-ref は stream-cdr でデータを生成し、それを n 回繰り返すことで n 番目の要素を求めます。stream-cdr は遅延ストリームを返すことに注意してください。あとは、stream-ref を再帰呼び出しして、n が 0 になったら stream-car でストリームの要素を取り出せばいいわけです。

ストリームから n 個の要素を取り出して新しい遅延ストリームに格納して返す関数 stream-take と、n 個の要素を取り除く関数 stream-drop も同様にプログラムすることができます。

リスト : 先頭から n 個の要素を取り出す

(define (stream-take s n)
  (if (or (stream-empty? s) (zero? n))
      nil
      (stream-cons (stream-car s)
                   (stream-take (stream-cdr s) (- n 1)))))
リスト : 先頭から n 個の要素を取り除く

(define (stream-drop s n)
  (if (or (stream-empty? s) (zero? n))
      s
      (stream-drop (stream-cdr s) (- n 1))))

引数 s が遅延ストリームで、引数 n が取り出す要素の個数です。どちらの関数も s が空になるか、n が 0 になるまで処理を繰り返します。stream-take は stream-car で要素を取り出し、それを stream-cons で新しい遅延ストリームに追加して返します。stream-take は有限ストリームを返すことに注意してください。stream-drop は stream-cdr を n 回繰り返すだけです。

簡単な実行例を示します。

gosh[r7rs.user]> (define fibo (fibonacci 0 1))
fibo
gosh[r7rs.user]> (do ((i 0 (+ i 1))) ((>= i 10)) (display (stream-ref fibo i)) (newline))
0
1
1
2
3
5
8
13
21
34
#t
gosh[r7rs.user]> (stream->list (stream-take fibo 10))
(0 1 1 2 3 5 8 13 21 34)
gosh[r7rs.user]> (stream->list (stream-take (stream-drop fibo 40) 8))
(102334155 165580141 267914296 433494437 701408733 1134903170 1836311903 2971215073)

変数 fibo にフィボナッチ数列を生成するストリームをセットします。stream-ref で順番に要素を 10 個求めると、その値はフィボナッチ数列になっていますね。同様に、stream-take で 10 個の要素を取り出すと、リストの要素はフィボナッチ数列になります。stream-drop で 40 個の要素を取り除き、そのあと stream-take で 10 個の要素を取り出すと、41 番目以降のフィボナッチ数列を求めることができます。

●遅延ストリームの連結

次は、2 つの遅延ストリームを受け取って 1 つの遅延ストリームを返す関数を考えます。一番簡単な操作は 2 つの遅延ストリームを結合することです。次のリストを見てください。

リスト : 遅延ストリームの結合

(define (stream-append s1 s2)
  (if (stream-empty? s1)
      s2
      (stream-cons (stream-car s1)
                   (stream-append (stream-cdr s1) s2))))

関数 stream-append はストリーム s1 と s2 を結合したストリームを返します。処理は簡単で、s1 の要素を順番に取り出していき、s1 が空になったら s2 を返すだけです。

簡単な実行例を示しましょう。

gosh[r7rs.user]> (define s1 (stream-range 1 4))
s1
gosh[r7rs.user]> (define s2 (stream-range 5 8))
s2
gosh[r7rs.user]> (define s3 (stream-append s1 s2))
s3
gosh[r7rs.user]> (stream->list s3)
(1 2 3 4 5 6 7 8)

次は遅延ストリーム s1 と s2 の要素を交互に出力するストリームを作ります。次のリストを見てください。

リスト : 遅延ストリームの要素を交互に出力

(define (interleave s1 s2)
  (if (stream-empty? s1)
      s2
      (stream-cons (stream-car s1)
                   (interleave s2 (stream-cdr s1)))))

関数 interleave はストリーム s1 と s2 を受け取ります。そして、s1 の要素を新しい遅延ストリームに格納したら、次は s2 の要素を新しい遅延ストリームに格納します。これはプロミスで interleave を呼び出すとき、引数 s1 と s2 の順番を交換するだけです。このとき、s1 は stream-cdr で次の要素を求めます。これで s1 と s2 の要素を交互に出力することができます。

簡単な実行例を示しましょう。

gosh[r7rs.user]> (define s4 (interleave s1 s2))
s4
gosh[r7rs.user]> (stream->list s4)
(1 5 2 6 3 7 4 8)

stream-append の場合、無限ストリームを結合することはできませんが、interleave ならば無限ストリームにも対応することができます。簡単な例を示しましょう。

gosh[r7rs.user]> (define ones (stream-cons 1 ones))
ones
gosh[r7rs.user]> (stream->list (stream-take ones 10))
(1 1 1 1 1 1 1 1 1 1)
gosh[r7rs.user]> (define twos (stream-cons 2 twos))
twos
gosh[r7rs.user]> (stream->list (stream-take twos 10))
(2 2 2 2 2 2 2 2 2 2)
gosh[r7rs.user]> (stream->list (stream-take (interleave ones twos) 10))
(1 2 1 2 1 2 1 2 1 2)

ones は 1 を無限に出力するストリームで、twos は 2 を無限に出力するストリームです。stream-append で ones と twos を結合しても無限に 1 を出力するだけですが、interleave で ones と twos を結合すれば、1 と 2 を交互に出力することができます。これで無限ストリームの要素を混ぜ合わせることができます。

●高階関数

次は遅延ストリーム用の高階関数を作りましょう。

リスト : 高階関数 (1)

;; マッピング
(define (map-1 proc s)
  (if (stream-empty? s)
      nil
      (stream-cons (proc (stream-car s))
                   (map-1 proc (stream-cdr s)))))

(define (map-n proc ss)
  (if (member nil ss)
      nil
      (stream-cons (apply proc (map stream-car ss))
                   (map-n proc (map stream-cdr ss)))))

(define stream-map
  (case-lambda
   ((proc s)
    (map-1 proc s))
   ((proc s . args)
    (map-n proc (cons s args)))))

;; フィルター
(define (stream-filter pred s)
  (cond
   ((stream-empty? s) nil)
   ((pred (stream-car s))
    (stream-cons (stream-car s)
                 (stream-filter pred (stream-cdr s))))
   (else
    (stream-filter pred (stream-cdr s)))))

stream-map は引数のストリームの要素に関数 proc を適用した結果を新しいストリームに格納して返します。stream-map は複数の遅延ストリームを受け取ることができます。stream-filter は述語 pred が真を返す要素だけを新しいストリームに格納して返します。

リスト : 高階関数 (2)

;; 畳み込み
(define (foldl-1 proc a s)
  (if (stream-empty? s)
      a
      (foldl-1 proc (proc a (stream-car s)) (stream-cdr s))))

(define (foldl-n proc a ss)
  (if (member nil ss)
      a
      (foldl-n proc
               (apply proc a (map stream-car ss))
               (map stream-cdr ss))))

(define stream-foldl
  (case-lambda
   ((proc a s)
    (foldl-1 proc a s))
   ((proc a s . args)
    (foldl-n proc a (cons s args)))))

(define (foldr-1 proc a s)
  (if (stream-empty? s)
      a
      (proc (foldr-1 proc a (stream-cdr s)) (stream-car s))))

(define (foldr-n proc a ss)
  (if (member nil ss)
      a
      (apply proc
             (foldr-n proc a (map stream-cdr ss))
             (map stream-car ss))))

(define stream-foldr
  (case-lambda
   ((proc a s)
    (foldr-1 proc a s))
   ((proc a s . args)
    (foldr-n proc a (cons s args)))))

stream-foldl と stream-foldr は遅延ストリームに対して畳み込み処理を行います。どちらの関数も複数の遅延ストリームを受け取ることができます。無限ストリームの場合、これらの関数は処理が終了しないので注意してください。

リスト : 高階関数 (3)

;; 累積値を格納した遅延ストリームを返す
(define (scanl-1 proc a s)
  (if (stream-empty? s)
      (stream-cons a nil)
      (stream-cons a (scanl-1 proc (proc a (stream-car s)) (stream-cdr s)))))

(define (scanl-n proc a ss)
  (if (member nil ss)
      (stream-cons a nil)
      (stream-cons a (scanl-n proc
                              (apply proc a (map stream-car ss))
                              (map stream-cdr ss)))))

(define stream-scanl
  (case-lambda
   ((proc a s)
    (scanl-1 proc a s))
   ((proc a s . args)
    (scanl-n proc a (cons s args)))))

;; 巡回
(define (for-each-1 proc s)
  (unless
   (stream-empty? s)
   (proc (stream-car s))
   (for-each-1 proc (stream-cdr s))))

(define (for-each-n proc ss)
  (unless
   (member nil ss)
   (apply proc (map stream-car ss))
   (for-each-n proc (map stream-cdr ss))))

(define stream-for-each
  (case-lambda
   ((proc s)
    (for-each-1 proc s))
   ((proc s . args)
    (for-each-n proc (cons s args)))))

stream-scanl は遅延ストリーム s の先頭から畳み込みを行い、計算途中の累積値を格納した遅延ストリームを返します。stream-foldl と違って、s が無限ストリームでも動作します。stream-for-each は遅延ストリームの要素に関数 proc を適用します。これらの関数は複数の遅延ストリームを受け取ることができます。なお、stream-for-each に無限ストリームを渡すと無限ループになります。ご注意くださいませ。

簡単な実行例を示しましょう。

gosh[r7rs.user]> (define s1 (stream-unfold (lambda (x) (+ x 1)) 1))
s1
gosh[r7rs.user]> (define s2 (stream-map (lambda (x) (* x x)) s1))
s2
gosh[r7rs.user]> (stream->list (stream-take s2 10))
(1 4 9 16 25 36 49 64 81 100)
gosh[r7rs.user]> (stream->list (stream-take (stream-map cons s1 s1) 10))
((1 . 1) (2 . 2) (3 . 3) (4 . 4) (5 . 5) (6 . 6) (7 . 7) (8 . 8) (9 . 9)
 (10 . 10))
gosh[r7rs.user]> (define s3 (stream-filter even? s1))
s3
gosh[r7rs.user]> (stream->list (stream-take s3 10))
(2 4 6 8 10 12 14 16 18 20)
gosh[r7rs.user]> (stream-foldl + 0 (stream-take s1 100))
5050
gosh[r7rs.user]> (stream-foldr + 0 (stream-take s1 100))
5050
gosh[r7rs.user]> (stream-foldl + 0 (stream-take s1 10) (stream-take s1 10))
110
gosh[r7rs.user]> (stream-foldr + 0 (stream-take s1 10) (stream-take s1 10))
110
gosh[r7rs.user]> (define s5 (stream-scanl + 0 s1))
s4
gosh[r7rs.user]> (stream->list (stream-take s4 20))
(0 1 3 6 10 15 21 28 36 45 55 66 78 91 105 120 136 153 171 190)
gosh[r7rs.user]> (define s5 (stream-scanl + 0 s1 s1))
s5
gosh[r7rs.user]> (stream->list (stream-take s5 20))
(0 2 6 12 20 30 42 56 72 90 110 132 156 182 210 240 272 306 342 380)
gosh[r7rs.user]> (stream-for-each (lambda (x) (display x) (newline)) (stream-take s1 10))
1
2
3
4
5
6
7
8
9
10
#<undef>

変数 s1 に 1 から始まる整数列を生成する遅延ストリームをセットします。次に、s1 の要素を 2 乗する遅延ストリームを stream-map で生成して変数 s2 にセットします。stream-take で s2 から要素を 10 個取り出すと、s1 の要素を 2 乗した値になります。

s1 から偶数列の遅延ストリームを得るには、引数が偶数のときに真を返す述語を stream-filter に渡します。その返り値を変数 s3 にセットして stream-take で 10 個の要素を取り出すと、リストの要素は 2 から 20 までの値になります。

stream-take で有限ストリームを生成すると畳み込みを行うことができます。stream-foldl と stream-foldr で要素の合計値を求めると 5050 になります。stream-scanl は無限ストリームでも動作します。stream-for-each も正常に動作していますね。

●stream-map の便利な使い方

stream-map は複数の遅延ストリームを受け取ることができるので、それらの遅延ストリームに対していろいろな処理を定義することができます。次の例を見てください。

gosh[r7rs.user]> (define (add-stream s1 s2) (stream-map + s1 s2))
add-stream
gosh[r7rs.user]> (define s1 (stream-range 1 4))
s1
gosh[r7rs.user]> (define s2 (stream-range 11 14))
s2
gosh[r7rs.user]> (define s3 (add-stream s1 s2))
s3
gosh[r7rs.user]> (stream->list s3)
(12 14 16 18)

add-stream は s1 と s2 の要素を加算した遅延ストリームを返します。この add-stream を使うと、整数を生成する遅延ストリームは次のように定義することができます。

gosh[r7rs.user]> (define ones (stream-cons 1 ones))
ones
gosh[r7rs.user]> (define ints (stream-cons 1 (add-stream ones ints)))
ints
gosh[r7rs.user]> (stream->list (stream-take ints 10))
(1 2 3 4 5 6 7 8 9 10)

ストリーム ints は、現在の ints に 1 を足し算することで整数を生成しています。これで整数が生成できるとは不思議ですね。ints の動作を図に示すと、次のようになります。

ones = Cons(1, delay ones)
     = Cons(1, lazy_obj1)

ints = Cons(1, delay (add_stream ones ints))
     = Cons(1, lazy_obj2)

lazy_obj2 = Cons(1 + 1, delay (add_stream (force lazy_obj1) (force lazy_obj2)))
          => Cons(2, delay (add_stream (force lazy_obj1) (force lazy_obj2)))
          => Cons(2, lazy_obj3)

lazy_obj3 => Cons(3, lazy (add_stream (force lazy_obj1) (force lazy_obj3)))
          => Cons(3, lazy_obj4)


        図 : ストリーム ints の動作

ones を Cons(1, lazy_obj1) と表し、ints を Cons(1, lazy_obj2) と表します。lazy_obj はプロミスを表します。ints で次の要素を生成するとき、lazy_obj2 を force します。すると、add_stream (stream_map2) に ones と ints が適用され、ストリームの要素 2 とプロミス lazy_obj3 が生成されます。このとき、lazy_obj3 の内容は add_stream (force lazy_obj1) (force lazy_obj2) になります。

次の要素を生成するときは、lazy_obj3 を force します。lazy_obj1 は Cons(1, lazy_obj1) に、lazy_obj2 は Cons(2, lazy_obj3) に評価されるので、ストリームの要素は 1 + 2 = 3 になり、プロミス lazy_obj4 の内容は add_stream (force lazy_obj1) (force lazy_obj3) になります。そして、このプロミスを force することで次の要素を求めることができます。

このように、プロミスの中に現時点の整数を保持しておき、そこに 1 を足し算することで整数列を生成しているわけです。ここで、プロミスは評価結果をキャッシュしているので、整数 n の次の値を簡単に計算できることに注意してください。もしも、プロミスを単純なクロージャで実装した場合、整数 n を求めるため再計算が行われるので、効率はとても悪くなります。

同様の方法でフィボナッチ数列を生成するストリームを定義することができます。

リスト : フィボナッチ数列の生成

(define fibs (stream-cons 0 (stream-cons 1 (add-stream (stream-cdr fibs) fibs))))

fibs が現在のフィボナッチ数列を表していて、(stream-cdr fibs) で次の要素を求めます。そして、それらを足し算することで、その次の要素を求めています。この場合、ストリームの初期値として 2 つの要素が必要になることに注意してください。

それでは実行してみましょう。

gosh[r7rs.user]> (define fibs (stream-cons 0 (stream-cons 1 (add-stream (stream-cdr fibs) fibs))))
fibs
gosh[r7rs.user]> (stream->list (stream-take fibs 20))
(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

このように、2 つのストリームを使ってフィボナッチ数列を生成することができます。

●stream-flatmap

次は高階関数 stream-flatmap を作りましょう。stream-flatmap を定義する場合、次のように stream-append を使うと問題が発生します。

リスト : stream-flatmap の定義 (間違い)

(define (stream-flatmap proc s)
  (if (stream-empty? s)
      nil
      (stream-append (proc (stream-car s))
                     (stream-flatmap proc (stream-cdr s)))))

Scheme の関数は正格評価なので、stream-append を実行する前に引数が評価されます。つまり、stream-flatmap の評価は遅延されるのではなく、遅延ストリームが空になるまで stream-flatmap が再帰呼び出しされるのです。これでは無限ストリームに対応することができません。

そこで、引数を遅延評価する関数 stream-append-delay を作ることにします。プログラムは次のようになります。

リスト : stream-append-delay と stream-flatmap

;; 遅延ストリームの連結 (遅延評価版)
(define (stream-append-delay s1 s2)
  (if (stream-empty? s1)
      (force s2)
      (stream-cons (stream-car s1)
                   (stream-append-delay (stream-cdr s1) s2))))

;; マッピングの結果を平坦化する
(define (stream-flatmap proc s)
  (if (stream-empty? s)
      nil
      (stream-append-delay (proc (stream-car s))
                           (delay (stream-flatmap proc (stream-cdr s))))))

stream-append-delay は stream-append とほぼ同じですが、遅延ストリーム s1 が空になったらプロミス s2 を force で評価するところが異なります。stream-flatmap では、stream-appned のかわりに stream-append-delay を使います。このとき、delay で生成したプロミスを引数に渡します。

それでは実行してみましょう。

gosh[r7rs.user]> (define s1 (stream-unfold (lambda (x) (+ x 1)) 1))
s1
gosh[r7rs.user]> (define s2 (stream-flatmap (lambda (x) (list->stream (make-list x x))) s1))
s2
gosh[r7rs.user]> (stream->list (stream-take s2 45))
(1 2 2 3 3 3 4 4 4 4 5 5 5 5 5 6 6 6 6 6 6 7 7 7 7 7 7 7 8 8 8 8 8 8 8 8 9 9 9 9 9 9 9 9 9)

s1 は無限ストリームになりますが、stream-flatmap は正常に動作していますね。

●stream-take-while と stream-drop-while

次は、遅延ストリームの先頭から述語が真を返す要素を取り出す stream-take-while と要素を取り除く stream-drop-while を作ります。

リスト : stream-take-while と stream-drop-while

(define (stream-take-while pred s)
  (if (or (stream-empty? s) (not (pred (stream-car s))))
      nil
      (stream-cons (stream-car s)
                   (stream-take-while pred (stream-cdr s)))))

(define (stream-drop-while pred s)
  (if (or (stream-empty? s) (not (pred (stream-car s))))
      s
      (stream-drop-while pred (stream-cdr s))))

どちらの関数も難しいところはないと思います。簡単な実行例を示しましょう。

gosh[r7rs.user]> (import (mylib lazystream))
gosh[r7rs.user]> (define s1 (stream-unfold (lambda (x) (+ x 1)) 1))
s1
gosh[r7rs.user]> (stream->list (stream-take-while (lambda (x) (< x 10)) s1))
(1 2 3 4 5 6 7 8 9)
gosh[r7rs.user]> (define s2 (stream-drop-while (lambda (x) (< x 10)) s1))
s2
gosh[r7rs.user]> (stream->list (stream-take s2 10))
(10 11 12 13 14 15 16 17 18 19)

●エラトステネスの篩

最後に簡単な例題として、ストリームを使って素数を求めるプログラムを作ってみましょう。なお、Gauche には素数を取り扱うライブラリ math.prime が用意されているので、私たちがプログラムを作る必要はありませんが、Scheme のお勉強ということで、あえてプログラムを作ることにします。

考え方は簡単です。最初に、2 から始まる整数列を生成するストリームを用意します。2 は素数なので、素数ストリームの要素になります。次に、この整数列から 2 で割り切れる整数を取り除き除きます。これは stream-filter を使うと簡単です。2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これも stream-filter を使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くように stream-filter を設定すればいいわけです。

このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番に stream-fiter で設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。

リスト : 素数の生成

(define (sieve s)
  (stream-cons (stream-car s)
               (sieve (stream-filter
                        (lambda (x) (not (zero? (modulo x (stream-car s)))))
                        (stream-cdr s)))))

sieve には 2 から始まる整数列を生成するストリームを渡します。stream-cdr でプロミスを force すると、stream-filter により整数列から 2 で割り切れる整数を取り除いたストリームが返されます。次の要素 3 を取り出すとき、このストリームに対して 3 で割り切れる整数を取り除くことになるので、2 と 3 で割り切れる整数が取り除かれることになります。次の要素は 5 になりますが、そのストリームからさらに 5 で割り切れる整数が stream-filter で取り除かれることになります。

このように stream-filter を重ねて設定していくことで、素数でない整数をふるい落としていくことができるわけです。それでは実行してみましょう。

gosh[r7rs.user]> (define primes (sieve (stream-unfold (lambda (x) (+ x 1)) 2)))
primes
gosh[r7rs.user]> (stream->list (stream-take primes 25))
(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)
gosh[r7rs.user]> (stream-for-each (lambda (x) (display x) (display " ")) (stream-take primes 100))
2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 107 109 113 127 131 137 
139 149 151 157 163 167 173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 
281 283 293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 
443 449 457 461 463 467 479 487 491 499 503 509 521 523 541 #<undef>

stream-unfold で 2 から始まる整数列を sieve に渡します。100 以下の素数は全部で 25 個あります。

●より高速な方法

関数 sieve は簡単にプログラムできますが、生成する素数の個数が多くなると、その実行速度はかなり遅くなります。実をいうと、sieve なみに簡単で sieve よりも高速な方法があります。

整数 n が素数か確かめる簡単な方法は、√n 以下の素数で割り切れるか試してみることです。割り切れる素数 m があれば、n は素数ではありません。そうでなければ、n は素数であることがわかります。

これをそのままプログラムすると次のようになります。

リスト : 素数列の生成

(define *primes*
  (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

(define (primes-from n)
  (if (prime? n)
      (stream-cons n (primes-from (+ n 2)))
      (primes-from (+ n 2))))

(define (prime? n)
  (let loop ((s (stream-cdr *primes*)))
    (let ((p (stream-car s)))
      (cond
       ((> (* p p) n) #t)
       ((zero? (modulo n p)) #f)
       (else
        (loop (stream-cdr s)))))))

変数 primes は無限の素数列を表します。実際に素数を生成する処理は関数 primes-from で行います。primes-from は関数 prime? を呼び出して n が素数かチェックします。そうであれば、stream-cons で n を遅延ストリームに追加します。そうでなければ primes-from を再帰呼び出しするだけです。偶数は素数ではないので、引数 n には奇数を与えていることに注意してください。

prime? も簡単です。named-let で primes から √n 以下の素数 p を取り出します。√n 以下の素数は生成済みなので、primes から取り出すことが可能です。ここでは√n のかわりに条件を p * p <= n としています。あとは、取り出した素数 p で n が割り切れないことを確認するだけです。

それでは実行してみましょう。

gosh[r7rs.user]> (stream-take primes 25)
(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)
gosh[r7rs.user]> (stream-ref primes 99)
541
gosh[r7rs.user]> (stream-ref primes 500)
3581

100 以下の素数は全部で 25 個あります。また、100 番目の素数は 541 になります。Scheme のリストは 0 から数えるので、(stream-ref primes 99) で 100 番目の素数になります。

実行時間ですが、stream-ref で 1001 番目の素数を求めてみました。実行環境は Ubunts 18.04 (Windows Subsystem for Linux), Intel Core i5-6200U 2.30GHz, Gauche version 0.9.9 です。

リスト : 簡単なテスト

(define (test ps n)
  (let ((s (current-jiffy)))
    (display (stream-ref ps n))
    (newline)
    (inexact (/ (- (current-jiffy) s) (jiffies-per-second)))))
gosh[r7rs.user]> (define s (sieve (stream-unfold (lambda (x) (+ x 1)) 2)))
s
gosh[r7rs.user]> (test s 1000)
7927
1.5443298
gosh[r7rs.user]> (test *primes* 1000)
7927
0.0054301

sieve よりも primes のほうが高速になりました。興味のある方はいろいろ試してみてください。

●参考文献, URL

  1. 計算機プログラムの構造と解釈 第二版 3.5 ストリーム
  2. Gauche ユーザリファレンス: 6.19 遅延評価

初版 2009 年 6 月 13 日
改訂 2017 年 2 月 5 日
改訂 2020 年 10 月 4 日

Copyright (C) 2009-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | Scheme | NextPage ]