M.Hiroi's Home Page

Go Language Programming

お気楽 Go 言語プログラミング入門

[ PrevPage | Golang | NextPage ]

電卓プログラムの改良 (遅延評価編その2)

今回は遅延評価の応用例として、電卓プログラムで「遅延ストリーム」を作ってみましょう。

●遅延ストリームの構造

ストリーム (stream) はデータの流れを抽象化したデータ構造です。たとえば、ファイル入出力はストリームと考えることができます。また、リストを使ってストリームを表すこともできます。ただし、単純なリストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができるようになります。これを「遅延ストリーム」とか「遅延リスト」と呼びます。

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して値を求めればよいわけです。

今回は遅延ストリームをコンスセルで表すことにします。コンスセルの CAR が現時点での先頭データを表し、CDR が遅延ストリームを生成する関数を格納するプロミスです。次のリストを見てください。

リスト : 基本操作

// ストリームの生成
streamCons = cons;

// 要素を取り出す
def streamCar(s) car(s) end

// 次の要素を取り出す
def streamCdr(s) force(cdr(s)) end

// ストリームは空か
def isEmpty(s) null(s) end

ストリームの生成は関数 streamCons で行います。これは cons に別名をつけただけです。streamCons の第 1 引数がストリームの要素で、第 2 引数が delay で生成したプロミスになります。プロミスにはストリームを生成する関数を格納します。プロミスを force することで、次の要素を格納した遅延ストリームを生成します。ストリームの終端は nil で表すことにします。遅延ストリームの終端を判定する述語 isEmpty を用意します。isEmpty は null と同じです。

関数 streamCar は遅延ストリーム s から要素を取り出して返します。関数 streamCdr は s のプロミスを force して、次の要素を格納した遅延ストリームを生成して返します。ようするに、これらの関数はリスト操作の cons, car, cdr に対応しているわけです。

●遅延ストリームの生成

それでは、遅延ストリームを生成する関数を作りましょう。たとえば、low から high までの整数列を生成するストリームは次のようにプログラムすることができます。

リスト : 整数列を生成するストリーム

def intgen(low, high)
  if low > high then
    nil
  else
    streamCons(low, delay(intgen(low + 1, high)))
  end
end

関数 intgen は遅延ストリームを生成して返します。streamCons の第 1 要素が現時点でのデータになります。第 2 要素のプロミスを force すると、intgen(low + 1, high) が実行されて次のデータを格納した遅延ストリームが返されます。そして、その中のプロミスを force すると、その次のデータを得ることができます。

簡単な例を示しましょう。

Calc> s1 = intgen(1, 10);
(1 . <Promise>)
Calc> streamCar(s1);
1
Calc> s2 = streamCdr(s1);
(2 . <Promise>)
Calc> streamCar(s2);
2
Calc> s3 = streamCdr(s2);
(3 . <Promise>)
Calc> streamCar(s3);
3
Calc> streamCar(s1);
1
Calc> streamCar(s2);
2
Calc> streamCar(s3);
3

このように、第 2 要素のプロミスを force することで、次々とデータを生成することができます。ここで、遅延ストリームからデータを取り出しても、先頭要素は取り除かれていないことに注意してください。たとえば、streamCdr(s1) は force でプロミスを評価しますが、その返り値は新しい遅延ストリームです。変数 s1 を書き換えない限り、ストリーム s1 の先頭要素は 1 のままです。

拙作のページ 並行プログラミング (2) では goroutine を使って整数列を生成しました。この場合、チャネルからデータを読み込むと、そのデータは取り除かれてしまいます。同じ数列を何度も使いたい場合は、その都度 goroutine を生成する必要があります。これに対し、遅延ストリーム s1 は s1 の値を書き換えない限り、何度でも 1 から始まる数列を生成することができます。

もう一つ、簡単な例を示しましょう。フィボナッチ数列を生成する遅延ストリームを作ります。次のリストを見てください。

リスト : フィボナッチ数列を生成する遅延ストリーム

def fibogen(a, b) 
  streamCons(a, delay(fibogen(b, a + b)))
end

関数 fibgen の引数 a がフィボナッチ数列の最初の項で、b が次の項です。したがって、プロミスに fibgen(b, a + b) を格納しておけば、force することでフィボナッチ数列を生成することができます。なお、この関数は無限ストリームを生成しますが、電卓プログラムの整数 (int64) には上限値があるので、際限なくフィボナッチ数列を生成できるわけではありません。ご注意ください。

それでは実行してみましょう。

Calc> s = fibogen(0, 1);
(0 . <Promise>)
Calc> streamCar(s);
0
Calc> s = streamCdr(s);
(1 . <Promise>)
Calc> streamCar(s);
1
Calc> s = streamCdr(s);
(1 . <Promise>)
Calc> streamCar(s);
1
Calc> s = streamCdr(s);
(2 . <Promise>)
Calc> streamCar(s);
2
Calc> s = streamCdr(s);
(3 . <Promise>)
Calc> streamCar(s);
3
Calc> s = streamCdr(s);
(5 . <Promise>)
Calc> streamCar(s);
5

変数 s の値を streamCdr の返り値で上書きしているので、streamCdr(s) を実行するたびにフィボナッチ数列を求めることができます。

●遅延ストリームの操作関数

次は遅延ストリームを操作する関数を作りましょう。最初は n 番目の要素を求める関数 streamRef です。

リスト : n 番目の要素を求める

def streamRef(s, n)
  while n > 0 and !isEmpty(s) do
    s = streamCdr(s),
    n = n - 1
  end,
  streamCar(s)
end

streamRef は while ループで streamCdr を n 回繰り返すことで n 番目の要素を求めます。

ストリームから n 個の要素を取り出してリストに格納して返す関数 streamTake も同様にプログラムすることができます。

リスト : n 個の要素を取り出す

def streamTake(s, n)
  if n == 0 or isEmpty(s) then
    nil
  else
    cons(streamCar(s), streamTake(streamCdr(s), n - 1))
  end
end

streamCar で遅延ストリームの要素を取り出し、それを streamTake の返り値 (リスト) の先頭に cons で追加します。streamTake を再帰呼び出しするときは、streamCdr で次の要素を求めていることに注意してください。n が 0 または遅延ストリームが終端に達した場合は nil を返します。

それでは、簡単な実行例を示しましょう。

Calc> s = fibogen(0, 1);
(0 . <Promise>)
Calc> streamRef(s, 0);
0
Calc> streamRef(s, 1);
1
Calc> streamRef(s, 2);
1
Calc> streamRef(s, 3);
2
Calc> streamRef(s, 4);
3
Calc> streamRef(s, 5);
5
Calc> streamTake(s, 10);
(0 1 1 2 3 5 8 13 21 34)
Calc> streamTake(s, 20);
(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

変数 s にフィボナッチ数列を生成するストリームをセットします。streamRef で順番に要素を取り出すと、その値はフィボナッチ数列になっていますね。同様に、streamTake で要素を取り出すと、リストの要素はフィボナッチ数列になります。streamRef と streamTake の引数 s は局所変数なので、その値を書き換えたとしても、大域変数 s の値が変わることはありません。大域変数 s のストリームは 1 から始まるフィボナッチ数列のままです。

●高階関数

ところで、遅延ストリームは高階関数も定義することができます。次のリストを見てください。

リスト : 高階関数

// マッピング
def streamMap(f, s)
  if isEmpty(s) then
    nil
  else
    streamCons(f(streamCar(s)), delay(streamMap(f, streamCdr(s))))
  end
end

// フィルター
def streamFilter(f, s)
  let r = nil in
    while !isEmpty(s) and null(r) do
      if f(streamCar(s)) then
        r = streamCons(streamCar(s), delay(streamFilter(f, streamCdr(s))))
      else
        s = streamCdr(s)
      end
    end,
    r
  end
end

streamMap と streamFilter は関数と遅延ストリームを受け取り、新しい遅延ストリームを生成して返します。streamMap は引数のストリームの要素に関数 f を適用した結果を新しいストリームに格納して返します。streamFilter は述語 f が真を返す要素だけを新しいストリームに格納して返します。

簡単な実行例を示しましょう。

Calc> s1 = intgen(1, 100);
(1 . <Promise>)
Calc> s2 = streamMap(fn(x) x * x end, s1);
(1 . <Promise>)
Calc> streamTake(s2, 10);
(1 4 9 16 25 36 49 64 81 100)
Calc> s3 = streamFilter(isEven, s1);
(2 . <Promise>)
Calc> streamTake(s3, 10);
(2 4 6 8 10 12 14 16 18 20)

変数 s1 に 1 から始まる整数列を生成するストリームをセットします。次に、s1 の要素を 2 乗するストリームを streamMap で生成して変数 s2 にセットします。streamTake で s2 から要素を 10 個取り出すと、s1 の要素を 2 乗した値になります。

s1 から偶数列のストリームを得るには、引数が偶数のときに真を返す述語 isEven を streamFilter に渡します。その返り値を変数 s3 にセットして、streamTake で 10 個の要素を取り出すと、リストの要素は 2 から 20 までの値になります。

●遅延ストリームの操作関数 (2)

次は、2 つの遅延ストリームを受け取って 1 つのストリームを返す関数を考えてみましょう。一番簡単な操作は 2 つのストリームを結合することです。次のリストを見てください。

リスト : 遅延ストリームの結合

def streamAppend(s1, s2)
  if isEmpty(s1) then
    s2
  else
    streamCons(streamCar(s1), delay(streamAppend(streamCdr(s1), s2)))
  end
end

関数 streamAppend はストリーム s1 と s2 を結合したストリームを返します。処理は簡単で、s1 の要素を順番に取り出していき、s1 が空になったら s2 を返すだけです。

簡単な実行例を示しましょう。

Calc> s1 = intgen(1, 4);
(1 . <Promise>)
Calc> s2 = intgen(11, 14);
(11 . <Promise>)
Calc> s3 = streamAppend(s1, s2);
(1 . <Promise>)
Calc> streamTake(s3, 8);
(1 2 3 4 11 12 13 14)

次はストリーム s1 と s2 の要素を交互に出力するストリームを作ります。次のリストを見てください。

リスト : ストリームの要素を交互に出力

def interleave(s1, s2)
  if isEmpty(s1) then
    s2
  else
    streamCons(streamCar(s1), delay(interleave(s2, streamCdr(s1))))
  end
end

関数 interleave はストリーム s1 と s2 を受け取ります。そして、s1 の要素を新しいストリームに格納したら、次は s2 の要素を新しいストリームに格納します。これは interleave を呼び出すとき、引数 s1 と s2 の順番を交換するだけです。これで s1 と s2 の要素を交互に出力することができます。

簡単な実行例を示しましょう。

Calc> s4 = interleave(s1, s2);
(1 . <Promise>)
Calc> streamTake(s4, 8);
(1 11 2 12 3 13 4 14)

streamAppend の場合、無限ストリームを結合することはできませんが、interleave ならば無限ストリームにも対応することができます。簡単な例を示しましょう。

Calc> ones = streamCons(1, delay(ones));
(1 . <Promise>)
Calc> streamTake(ones, 10);
(1 1 1 1 1 1 1 1 1 1)
Calc> twos = streamCons(2, delay(twos));
(2 . <Promise>)
Calc> streamTake(twos, 10);
(2 2 2 2 2 2 2 2 2 2)
Calc> streamTake(interleave(ones, twos), 10);
(1 2 1 2 1 2 1 2 1 2)

ones は 1 を無限に出力するストリームで、twos は 2 を無限に出力するストリームです。streamAppend で ones と twos を結合しても無限に 1 を出力するだけですが、interleave で ones と twos を結合すれば、1 と 2 を交互に出力することができます。これで無限ストリームの要素を混ぜ合わせることができます。

●高階関数 (2)

2 つのストリームを受け取るマップ関数 streamMap2 も簡単です。プログラムは次のようになります。

リスト : マップ関数

def streamMap2(f, s1, s2)
  if isEmpty(s1) or isEmpty(s2) then
    nil
  else
    streamCons(f(streamCar(s1), streamCar(s2)),
               delay(streamMap2(f, streamCdr(s1), streamCdr(s2))))
  end
end

ストリーム s1 と s2 から要素 x, y を取り出し、f(x, y) の評価結果を新しいストリームに格納します。

streamMap2 を使うと、ストリームに対していろいろな処理を定義することができます。次の例を見てください。

リスト : 2 つのストリームの要素を加算する

def streamAdd(s1, s2)
  streamMap2(fn(x, y) x + y end, s1, s2)
end
Calc> s1 = intgen(1, 4);
(1 . <Promise>)
Calc> s2 = intgen(11, 14);
(11 . <Promise>)
Calc> s3 = streamAdd(s1, s2);
(12 . <Promise>)
Calc> streamTake(s3, 4);
(12 14 16 18)

streamAdd は s1 と s2 の要素を加算したストリームを返します。この streamAdd を使うと、整数を生成するストリームは次のように定義することができます。

Calc> ones = streamCons(1, delay(ones));
(1 . <Promise>)
Calc> ints = streamCons(1, delay(streamAdd(ones, ints)));
(1 . <Promise>)
Calc> streamTake(ints, 30);
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30)

同様の方法でフィボナッチ数列を生成するストリームを定義することができます。

Calc> fibs = streamCons(0, delay(streamCons(1, delay(streamAdd(streamCdr(fibs), fibs)))));
(0 . <Promise>)
Calc> streamTake(fibs, 30);
(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765 10946 17711
28657 46368 75025 121393 196418 317811 514229)

ストリーム ints は、現在の ints に 1 を足し算することで整数を生成しています。fibs は現在のフィボナッチ数列を表していて、streamCdr(fibs) で次の要素を求め、それらを足し算することで、その次の要素を求めています。この場合、ストリームの初期値として 2 つの要素が必要になることに注意してください。

ここで、プロミスは評価結果をキャッシュしていることに注意してください。もしも、プロミスを単純なクロージャで実装した場合、次の値を求めるため再計算が必ず行われるので、効率がとても悪くなる場合があります。

●組 (pair) を生成するストリーム

それでは簡単な例題として、2 つのストリームからその要素の組み合わせを生成するストリームを作りましょう。要素が n 個のストリームの場合、組み合わせは n * n 個あります。次の図を見てください。

(a0, b0) (a0, b1) (a0, b2) ... (a0, bn)
(a1, b0) (a1, b1) (a1, b2) ... (a1, bn)
(a2, b0) (a2, b1) (a2, b2) ... (a2, bn)

                           ...

(an, b0) (an, b1) (an, b2) ... (an, bn)


        図 : n * n 個の組

この組み合わせを生成するストリームは簡単にプログラムできるように思います。次のリストを見てください。

リスト : 組を生成するストリーム

def pairStream(s1, s2)
  if isEmpty(s1) then
    nil
  else
    streamAppend(streamMap(fn(x) list(streamCar(s1), x) end, s2),
                 pairStream(streamCdr(s1), s2))
  end
end

関数 pairStream はストリーム s1 と s2 の要素の組を出力します。最初に、s1 の要素 x を取り出して、streamMap で x と s2 の要素の組を生成します。それを streamAppend で出力してから、pairStream を再帰呼び出しして s1 の次の要素と s2 の組を求めます。とても簡単なプログラムですが、実は重大な欠点があります。これはあとで説明します。

それでは実際に実行してみましょう。

Calc> s = pairStream(intgen(1, 4), intgen(11, 14));
((1 11) . <Promise>)
Calc> streamTake(s, 16);
((1 11) (1 12) (1 13) (1 14) (2 11) (2 12) (2 13) (2 14) (3 11) (3 12) (3 13)
 (3 14) (4 11) (4 12) (4 13) (4 14))

正常に動作しているように見えますが、実は遅延ストリームとしては機能していないのです。streamMap に渡す匿名関数で生成した組を表示すると、次のようになります。

Calc> s = pairStream(intgen(1, 4), intgen(11, 14));
(1 11)
(2 11)
(3 11)
(4 11)
((1 11) . <Promise>)
Calc> streamTake(s, 16);
(1 12)
(1 13)
(1 14)
(2 12)
(2 13)
(2 14)
(3 12)
(3 13)
(3 14)
(4 12)
(4 13)
(4 14)
((1 11) (1 12) (1 13) (1 14) (2 11) (2 12) (2 13) (2 14) (3 11) (3 12) (3 13)
 (3 14) (4 11) (4 12) (4 13) (4 14))

最初に 4 つの組が生成されています。これは streamAppend の第 2 引数で pairStream を再帰呼び出ししているために起こります。電卓プログラムの関数は「値呼び」なので、引数は必ず評価されます。したがって、streamAppend を評価する前に引数である streamMap と pairStream が評価され、s1 の要素 1, 2, 3, 4 に対応するストリームが生成されるのです。

このような場合、streamAppend の第 2 引数を遅延評価するとうまくいきます。次のリストを見てください。

リスト : ストリームの結合 (遅延評価版)

def streamAppendDelay(s1, s2)
  if isEmpty(s1) then
    force(s2)
  else
    streamCons(streamCar(s1),
               delay(streamAppendDelay(streamCdr(s1), s2)))
  end
end

関数 streamAppendDelay の引数 s2 にはプロミスが渡されるので、s1 が空になったら force(s2) を返すようにします。

次は pairStream を修正します。

リスト : 組を生成するストリーム

def pairStream1(s1, s2)
  if isEmpty(s1) then
    nil
  else
    streamAppendDelay(streamMap(fn(x) list(streamCar(s1), x) end, s2),
                      delay(pairStream1(streamCdr(s1), s2)))
  end
end

streamAppend のかわりに streamAppendDelay を使い、その第 2 引数を delay で遅延評価します。プログラムの修正はこれだけです。

それでは実行してみましょう。なお、動作を確認するため、生成した組を表示するようにプログラムを修正しています。

Calc> s = pairStream1(intgen(1, 4), intgen(11, 14));
(1 11)
((1 11) . <Promise>)
Calc> streamTake(s, 16);
(1 12)
(1 13)
(1 14)
(2 11)
(2 12)
(2 13)
(2 14)
(3 11)
(3 12)
(3 13)
(3 14)
(4 11)
(4 12)
(4 13)
(4 14)
((1 11) (1 12) (1 13) (1 14) (2 11) (2 12) (2 13) (2 14) (3 11) (3 12) (3 13)
 (3 14) (4 11) (4 12) (4 13) (4 14))

正常に動作していますね。

●無限ストリームで組 (pair) を生成する場合

ところで、pairStream は無限ストリームに対応していません。実際、引数 s2 に無限ストリームを渡した場合、引数 s1 の最初の要素を a0 とすると (a0, s2 の要素) という組しか生成されません。そこで、下図に示すように、対角線上に組を生成していくことにします。

   | a0  a1  a2  a3  a4  a5
---+-----------------------------
b0 | 0   1   3   6   10  15  ...
   |
b1 | 2   4   7   11  16  ...
   |
b2 | 5   8   12  17  ...
   |
b3 | 9   13  18  ...
   |
b4 | 14  19  ...
   |
b5 | 20 ...
   |
   | ...
   |

図 : 無限ストリームによる組の生成

上図を見ればおわかりのように、対角線の要素数を n とすると、組は (an-1, b0), (an-2, b1), ..., (a1, bn-2), (a0, bn-1) となっています。これは、s1 から n 個の要素を取り出したリストと、s2 から n 個の要素を取り出して反転したリストを、map list でまとめた形になっています。これをプログラムすると次のようになります。

リスト : 無限ストリームによる組の生成 (2)

def streamOfList(xs)
  if null(xs) then
    nil
  else
    streamCons(car(xs), delay(streamOfList(cdr(xs))))
  end
end

def pairStreamSub(s1, s2, n)
  streamAppendDelay(
    streamMap2(fn(x, y) list(x, y) end,
               streamOfList(streamTake(s1, n)),
               streamOfList(reverse(streamTake(s2, n)))),
    delay(pairStreamSub(s1, s2, n + 1)))
end

def pairStream2(s1, s2)
  pairStreamSub(s1, s2, 1)
end

関数 streamOfList は引数のリストをストリームに変換します。実際の処理は関数 pairStreamSub で行っています。引数 n が対角線上の要素数を表します。streamTake で s1 と s2 から要素を取り出し、s2 から取り出したリストを reverse で反転してから streamOfList でストリームに変換します。それを streamMap2 に渡して、ストリームの要素を list に格納したストリームを返します。

あとは、そのストリームと pairStreamSub の返り値を streamAppendDelay で連結します。これで組を対角線上の順番で生成することができます。

それでは実行してみましょう。

Calc> ones = streamCons(1, delay(ones));
(1 . <Promise>)
Calc> ints = streamCons(1, delay(streamAdd(ones, ints)));
(1 . <Promise>)
Calc> s = pairStream2(ints, ints);
((1 1) . <Promise>)
Calc> streamTake(s, 25);
((1 1) (1 2) (2 1) (1 3) (2 2) (3 1) (1 4) (2 3) (3 2) (4 1) (1 5) (2 4) (3 3)
 (4 2) (5 1) (1 6) (2 5) (3 4) (4 3) (5 2) (6 1) (1 7) (2 6) (3 5) (4 4))
Calc> streamTake(s, 40);
((1 1) (1 2) (2 1) (1 3) (2 2) (3 1) (1 4) (2 3) (3 2) (4 1) (1 5) (2 4) (3 3)
 (4 2) (5 1) (1 6) (2 5) (3 4) (4 3) (5 2) (6 1) (1 7) (2 6) (3 5) (4 4) (5 3)
 (6 2) (7 1) (1 8) (2 7) (3 6) (4 5) (5 4) (6 3) (7 2) (8 1) (1 9) (2 8) (3 7)
 (4 6))

●素数の生成

次はストリームを使って素数を求めるプログラムを作ってみましょう。アルゴリズムに「エラトステネスの篩」を用いる場合、プログラムは次のようになります。

リスト : 素数の生成

def streamSieve(s)
  let
    f = fn(x) x % streamCar(s) != 0 end
  in
    streamCons(streamCar(s), delay(streamSieve(streamFilter(f, streamCdr(s)))))
  end
end

streamSieve には 2 から始まる整数列を生成するストリームを渡します。プロミスを評価すると、streamFilter により整数列から 2 で割り切れる整数を取り除いたストリームが返されます。次の要素 3 を取り出すとき、このストリームに対して 3 で割り切れる整数を取り除くことになるので、2 と 3 で割り切れる整数が取り除かれることになります。次の要素は 5 になりますが、そのストリームからさらに 5 で割り切れる整数が streamFilter で取り除かれることになります。

このように streamFilter が設定されていくことで、素数でない整数をふるい落としていくことができるわけです。それでは実行してみましょう。

Calc> s = streamSieve(intgen(2, 1000));
(2 . <Promise>)
Calc> streamTake(s, 25);
(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)

正常に動作していますね。

●streamSieve の改良

関数 streamSieve は簡単にプログラムできますが、生成する素数の個数が多くなると、その実行速度はかなり遅くなります。実をいうと、streamSieve なみに簡単で streamSieve よりも高速な方法があります。

整数 n が素数か確かめる簡単な方法は、√n 以下の素数で割り切れるか試してみることです。割り切れる素数 m があれば、n は素数ではありません。そうでなければ、n は素数であることがわかります。

これをそのままプログラムすると次のようになります。

リスト : 素数列の生成

// f を満たす要素を取り出す
def streamTakeWhile(f, s)
  if isEmpty(s) or !f(streamCar(s)) then
    nil
  else
    cons(streamCar(s), streamTakeWhile(f, streamCdr(s)))
  end
end

// dummy
def primesFrom(n) n end

// 素数列 (遅延ストリーム)
primes = streamCons(2, delay(streamCons(3, delay(streamCons(5, delay(primesFrom(7)))))));

// n は素数か
def isPrime(n)
  every(fn(x) n % x != 0 end, streamTakeWhile(fn(x) x * x <= n end, primes))
end

// 素数の生成
def primesFrom(n)
  while !isPrime(n) do
    n = n + 2
  end,
  streamCons(n, delay(primesFrom(n + 2)))
end

関数 streamTakeWhile は、述語 f が真を返す要素が続く限り、遅延ストリーム xs から要素を取り出してリストに格納して返します。変数 primes は無限の素数列を表します。実際に素数を生成する処理は関数 primesFrom で行います。primesFrom は簡単で、関数 isPrime を呼び出して n が素数かチェックします。primesFrom を呼び出すときは素数列 primes を渡すことに注意してください。そうであれば、streamCons で n を遅延ストリームに追加します。そうでなければプロミスで primesFrom を呼び出すだけです。偶数は素数ではないので、引数 n には奇数を与えていることに注意してください。

isPrime も簡単です。streamTakeWhile で xs (素数列 primes) から √n 以下の素数列を取り出します。√n 以下の素数は生成済みなので、xs から streamTakeWhile で取り出すことが可能です。ここでは√n のかわりに条件を x * x <= n としています。あとは、関数 every を使って、取り出した素数で n が割り切れないことを確認するだけです。

それでは実行してみましょう。

Calc> streamTake(primes, 25);
(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)
Calc> streamRef(primes, 25 - 1);
97
Calc> streamRef(primes, 100 - 1);
541
Calc> streamRef(primes, 1000 - 1);
7919

正常に動作していますね。

実行時間を計測したところ、結果は次のようになりました。

Calc> s1 = streamSieve(streamCdr(ints));
(2 . <Promise>)
Calc> let s = clock() in println(streamRef(s1, 1000)), since(s) end;
7927
1.2945162s
Calc> let s = clock() in println(streamRef(primes, 1000)), since(s) end;
7927
81.4374ms
Calc> let s = clock() in println(streamRef(s1, 1000)), since(s) end;
7927
1.8858ms
Calc> let s = clock() in println(streamRef(primes, 1000)), since(s) end;
7927
1.6287ms

streamSieve よりも primes のほうが 10 倍以上速くなりました。2 回目はキャッシュが働くので、どちらの方法でも高速に求めることができます。

今回はここまでです。次回は電卓プログラムに「コルーチン (co-routine)」を追加してみましょう。

●参考文献

  1. "Structure and Interpretation of Computer Programs (SICP)" 3.5 Streams
  2. Gauche ユーザリファレンス: 6.18 遅延評価

●プログラムリスト

//
// lazy.cal : 遅延ストリーム
//
//            Copyright (C) 2014-2021 Makoto Hiroi
//

// 基本操作
streamCons = cons;
def streamCar(s) car(s) end
def streamCdr(s) force(cdr(s)) end
def isEmpty(s) null(s) end

// 整数列の生成
def intgen(low, high)
  if low > high then
    nil
  else
    streamCons(low, delay(intgen(low + 1, high)))
  end
end

// フィボナッチ数列
def fibogen(a, b) 
  streamCons(a, delay(fibogen(b, a + b)))
end

// n 番目の要素を求める
def streamRef(s, n)
  while n > 0 and !isEmpty(s) do
    s = streamCdr(s),
    n = n - 1
  end,
  streamCar(s)
end

// n 個の要素を取り出す
def streamTake(s, n)
  if n == 0 or isEmpty(s) then
    nil
  else
    cons(streamCar(s), streamTake(streamCdr(s), n - 1))
  end
end

// 連結
def streamAppend(s1, s2)
  if isEmpty(s1) then
    s2
  else
    streamCons(streamCar(s1), delay(streamAppend(streamCdr(s1), s2)))
  end
end

def interleave(s1, s2)
  if isEmpty(s1) then
    s2
  else
    streamCons(streamCar(s1), delay(interleave(s2, streamCdr(s1))))
  end
end

// 高階関数
def streamMap(f, s)
  if isEmpty(s) then
    nil
  else
    streamCons(f(streamCar(s)), delay(streamMap(f, streamCdr(s))))
  end
end

def streamFilter(f, s)
  let r = nil in
    while !isEmpty(s) and null(r) do
      if f(streamCar(s)) then
        r = streamCons(streamCar(s), delay(streamFilter(f, streamCdr(s))))
      else
        s = streamCdr(s)
      end
    end,
    r
  end
end

def streamMap2(f, s1, s2)
  if isEmpty(s1) or isEmpty(s2) then
    nil
  else
    streamCons(f(streamCar(s1), streamCar(s2)),
               delay(streamMap2(f, streamCdr(s1), streamCdr(s2))))
  end
end

def streamAdd(s1, s2)
  streamMap2(fn(x, y) x + y end, s1, s2)
end

// 組の生成
def pairStream(s1, s2)
  if isEmpty(s1) then
    nil
  else
    streamAppend(streamMap(fn(x) println(list(streamCar(s1), x)) end, s2),
                 pairStream(streamCdr(s1), s2))
  end
end

def streamAppendDelay(s1, s2)
  if isEmpty(s1) then
    force(s2)
  else
    streamCons(streamCar(s1),
               delay(streamAppendDelay(streamCdr(s1), s2)))
  end
end

def pairStream1(s1, s2)
  if isEmpty(s1) then
    nil
  else
    streamAppendDelay(streamMap(fn(x) println(list(streamCar(s1), x)) end, s2),
                      delay(pairStream1(streamCdr(s1), s2)))
  end
end

def streamOfList(xs)
  if null(xs) then
    nil
  else
    streamCons(car(xs), delay(streamOfList(cdr(xs))))
  end
end

def pairStreamSub(s1, s2, n)
  streamAppendDelay(
    streamMap2(fn(x, y) list(x, y) end,
               streamOfList(streamTake(s1, n)),
               streamOfList(reverse(streamTake(s2, n)))),
    delay(pairStreamSub(s1, s2, n + 1)))
end

def pairStream2(s1, s2)
  pairStreamSub(s1, s2, 1)
end

// 素数
def streamSieve(s)
  let
    f = fn(x) x % streamCar(s) != 0 end
  in
    streamCons(streamCar(s), delay(streamSieve(streamFilter(f, streamCdr(s)))))
  end
end

// f を満たす要素を取り出す
def streamTakeWhile(f, s)
  if isEmpty(s) or !f(streamCar(s)) then
    nil
  else
    cons(streamCar(s), streamTakeWhile(f, streamCdr(s)))
  end
end

// dummy
def primesFrom(n) n end

// 素数列 (遅延ストリーム)
primes = streamCons(2, delay(streamCons(3, delay(streamCons(5, delay(primesFrom(7)))))));

// n は素数か
def isPrime(n)
  every(fn(x) n % x != 0 end, streamTakeWhile(fn(x) x * x <= n end, primes))
end

// 素数の生成
def primesFrom(n)
  while !isPrime(n) do
    n = n + 2
  end,
  streamCons(n, delay(primesFrom(n + 2)))
end

初版 2014 年 6 月 14 日
改訂 2021 年 12 月 22 日

Copyright (C) 2014-2021 Makoto Hiroi
All rights reserved.

[ PrevPage | Golang | NextPage ]