「ストリーム (stream)」はデータの流れを抽象化したデータ構造です。たとえば、ファイル入出力はストリームと考えることができます。また、配列や連結リストを使ってストリームを表すこともできます。ただし、単純な配列や連結リストでは有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができるようになります。これを「遅延ストリーム」とか「遅延リスト」と呼びます。今回は遅延ストリームについて簡単に説明します。
遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して値を求めればよいわけです。
今回は遅延ストリームを表すクラス LazyS<T> を作成することにします。プログラムは次のようになります。
リスト : 遅延ストリーム (delay.csx に追加) class LazyS<T> { T Car; Delay<LazyS<T>> Cdr; public LazyS(T item, Func<LazyS<T>> func) { Car = item; Cdr = new Delay(func); } // 終端 public static readonly LazyS<T> nil = new LazyS<T>(default(T), () => nil); public bool IsEmpty() { return this == nil; } public T StreamCar() { return Car; } public LazyS<T> StreamCdr() { return Cdr.Force(); } }
フィールド Car に現時点での先頭データを格納し、フィールド Cdr に遅延ストリームを生成する Delay<LazyS<T>> を格納します。これを Force() することで、次の要素を格納した遅延ストリームを生成します。nil はストリームの終端を表します。無限ストリームだけを扱うのであれば nil は必要ありません。
メソッド StreamCar() は遅延ストリームから要素を取り出して返します。メソッド StreamCdr() は遅延ストリームの Cdr を Force() して、次の要素を格納した遅延ストリームを生成して返します。ようするに、これらのメソッドは Lisp / Scheme のリスト操作関数 cons, car, cdr に対応しているわけです。
それでは、遅延ストリームを生成するメソッドを作りましょう。たとえば、start から count 個の整数列を生成するストリームは次のようにプログラムすることができます。
リスト : 整数列を生成するストリーム public static LazyS<int> Range(int start, int count) { if (count == 0) return LazyS<int>.nil; else return new LazyS<int>(start, () => Range(start + 1, count - 1)); }
メソッド Range() は遅延ストリームを生成して返します。LazyS<int> のコンストラクタの第 1 要素が現時点でのデータになります。第 2 引数にはラムダ式を渡して、その中で Range() を呼び出します。
StreamCdr() を実行すると、遅延オブジェクトに格納されたラムダ式が評価され、その本体である Range() が実行されて次のデータを格納した遅延ストリームが返されます。その遅延ストリームに対してさらに StreamCdr() を実行すると、その次のデータを得ることができます。
簡単な例を示しましょう。
$ dotnet script > #load "delay.csx" > var s1 = LazyS<int>.Range(1, 10); > while (!s1.IsEmpty()) { * Console.WriteLine("{0}", s1.StreamCar()); * s1 = s1.StreamCdr(); * } 1 2 3 4 5 6 7 8 9 10
このように、StreamCdr() を実行することで、次々とデータを生成することができます。
もう一つ、簡単な例を示しましょう。フィボナッチ数列を生成する遅延ストリームを作ります。次のリストを見てください。
リスト : フィボナッチ数列を生成する遅延ストリーム public static LazyS<long> Fibonacci(long a = 0, long b = 1) { return new LazyS<long>(a, () => Fibonacci(b, a + b)); }
メソッド Fibonacci() の引数 a がフィボナッチ数列の最初の項で、b が次の項です。したがって、ラムダ式に Fibonacci(b, a + b) を格納しておけば、StreamCdr() を実行することでフィボナッチ数列を生成することができます。なお、このメソッドは無限ストリームを生成しますが、C# の整数 (long) には上限値があるので、際限なくフィボナッチ数列を生成できるわけではありません。ご注意ください。
次は遅延ストリームを操作するメソッドを作りましょう。最初は n 番目の要素を求めるメソッド StreamRef() です。
リスト : n 番目の要素を求める public T StreamRef(int n) { var xs = this; while (n-- > 0 && xs != nil) { xs = xs.StreamCdr(); } return xs.StreamCar(); }
StreamRef() は StreamCdr() を n 回繰り返して n 番目の要素を求めるだけです。
ストリームから n 個の要素を取り出して List<T> に格納して返すメソッド StreamTake() と先頭から n 個の要素を取り除くメソッド StreamDrop() も同様にプログラムすることができます。
リスト : n 個の要素を取り出す public List<T> StreamTake(int n) { var ys = new List<T>(); var xs = this; while (n-- > 0 && xs != nil) { ys.Add(xs.StreamCar()); xs = xs.StreamCdr(); } return ys; } public LazyS<T> StreamDrop(int n) { var xs = this; while (n-- > 0 && xs != nil) { xs = xs.StreamCdr(); } return xs; }
StreamTake() は StreamCar() と StreamCdr() を n 回繰り返して、要素を List<T> に格納して返します。StreamDrop() は StreamCdr() を n 回繰り返すだけです。
簡単な実行例を示しましょう。
> var s2 = LazyS<long>.Fibonacci(); > for (int i = 0; i < 20; i++) { * Console.Write("{0} ", s2.StreamRef(i)); * } 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 > foreach(long x in s2.StreamDrop(20).StreamTake(10)) { * Console.WriteLine("{0}", x); * } 6765 10946 17711 28657 46368 75025 121393 196418 317811 514229
変数 s2 にフィボナッチ数列を生成するストリームをセットします。StreamRef() で順番に要素を 20 個取り出すと、その値はフィボナッチ数列になっていますね。同様に、StreamDrop() で 20 個の要素を取り除き、StreamTake() で 10 個の要素を取り出します。すると、その要素は 20 番目以降のフィボナッチ数列になります。
ところで、遅延ストリームは高階関数も定義することができます。次のリストを見てください。
リスト : 高階関数 // マッピング public LazyS<U> StreamMap<U>(Func<T, U> func) { if (this == nil) return LazyS<U>.nil; else return new LazyS<U>(func(StreamCar()), () => StreamCdr().StreamMap(func)); } // フィルター public LazyS<T> StreamFilter(Func<T, bool> pred) { if (this == nil) return nil; else if (pred(StreamCar())) return new LazyS<T>(StreamCar(), () => StreamCdr().StreamFilter(pred)); else return StreamCdr().StreamFilter(pred); } // 畳み込み public U StreamFoldLeft<U>(Func<U, T, U> func, U a) { var xs = this; while (xs != nil) { a = func(a, xs.StreamCar()); xs = xs.StreamCdr(); } return a; } public U StreamFoldRight<U>(Func<T, U, U> func, U a) { if (this == nil) return a; else return func(StreamCar(), StreamCdr().StreamFoldRight(func, a)); }
StreamMap() と StreamFilter() は関数と遅延ストリームを受け取り、新しい遅延ストリームを生成して返します。StreamMap() は引数のストリームの要素に関数 func を適用した結果を新しいストリームに格納して返します。StreamFilter() は述語 pred が真を返す要素だけを新しいストリームに格納して返します。
StreamFoldLeft() と StreamFoldRight() は遅延ストリームに対して畳み込み処理を行います。無限ストリームの場合は処理が終了しないので注意してください。
簡単な実行例を示しましょう。
> var s3 = LazyS<int>.Range(1, 100); > var s4 = s3.StreamMap(n => n * n); > foreach(int x in s4.StreamTake(10)) { * Console.Write("{0} ", x); * } 1 4 9 16 25 36 49 64 81 100 > var s5 = s3.StreamFilter(n => n % 2 == 0); > foreach(int x in s5.StreamTake(10)) { * Console.Write("{0} ", x); * } 2 4 6 8 10 12 14 16 18 20 > s3.StreamFoldLeft((sum, n) => sum + n, 0) 5050 > s3.StreamFoldRight((n, sum) => sum + n, 0) 5050
変数 s3 に 1 から始まる整数列を生成するストリームをセットします。次に、s3 の要素を 2 乗するストリームを StreamMap() で生成して変数 s4 にセットします。StreamTake() で s4 から要素を 10 個取り出すと、s3 の要素を 2 乗した値になります。
s3 から偶数列のストリームを得るには、引数が偶数のときに真を返す述語を StreamFilter() に渡します。その返り値を変数 s5 にセットして、StreamTake() で 10 個の要素を取り出すと、リストの要素は 2 から 20 までの値になります。
s3 は有限個の遅延ストリームなので畳み込みを行うことができます。StreamFoldLeft() と StreamFoldRight() で要素の合計値を求めると 5050 になります。
次はストリームを使って素数を求めるプログラムを作ってみましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するストリームを用意します。2 は素数なので、素数ストリームの要素になります。次に、この整数列から 2 で割り切れる整数を取り除き除きます。これは StreamFilter() を使うと簡単です。
2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これも StreamFilter() を使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くように StreamFilter() を設定すればいいわけです。
このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番に StreamFiter() で設定して素数でない整数をふるい落としていくわけです。
プログラムは次のようになります。
リスト : 素数の生成 LazyS<int> Sieve(LazyS<int> s) { int p = s.StreamCar(); return new LazyS<int>(p, () => Sieve(s.StreamCdr().StreamFilter(n => n % p != 0))); }
Sieve() には 2 から始まる整数列を生成するストリームを渡します。遅延オブジェクトを評価すると、StreamFilter() により整数列から 2 で割り切れる整数を取り除いたストリームが返されます。次の要素 3 を取り出すとき、このストリームに対して 3 で割り切れる整数を取り除くことになるので、2 と 3 で割り切れる整数が取り除かれることになります。次の要素は 5 になりますが、そのストリームからさらに 5 で割り切れる整数が StreamFilter() で取り除かれることになります。
このように StreamFilter() が設定されていくことで、素数でない整数をふるい落としていくことができるわけです。それでは実行してみましょう。
> LazyS<int> Sieve(LazyS<int> s) { * int p = s.StreamCar(); * return new LazyS<int>(p, () => Sieve(s.StreamCdr().StreamFilter(n => n % p != 0))); * } > var s6 = Sieve(LazyS<int>.Range(2, 10000)); > foreach(int x in s6.StreamTake(100)) { * Console.Write("{0} ", x); * } 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499 503 509 521 523 541
正常に動作していますね。
次は、2 つの遅延ストリームを受け取って 1 つの遅延ストリームを返すメソッドを考えてみましょう。一番簡単な操作は 2 つのストリームを連結することです。次のリストを見てください。
リスト : 遅延ストリームの連結 public LazyS<T> StreamAppend(LazyS<T> xs) { if (this == nil) return xs; else return new LazyS<T>(StreamCar(), () => StreamCdr().StreamAppend(xs)); }
関数 StreamAppend() は自分 (this) と引数のストリーム xs を連結したストリームを返します。処理は簡単で、自分の要素を順番に取り出していき、空になったら xs を返すだけです。
次は 2 つの遅延ストリームの要素を交互に出力する遅延ストリームを作ります。次のリストを見てください。
リスト : ストリームの要素を交互に出力 public LazySInterleave(LazyS xs) { if (this == nil) return xs; else return new LazyS (StreamCar(), () => xs.Interleave(StreamCdr())); }
メソッド Interleave() は自分 (this) のほかに引数 xs に遅延ストリームを受け取ります。そして、自分の要素を新しいストリームに格納したら、次は xs の要素を新しいストリームに格納します。これは遅延オブジェクトで Interleave を呼び出すとき、xs に対して Interleave() を呼び出し、引数に StreamCdr() の結果を渡すだけです。これで要素を交互に出力することができます。
簡単な実行例を示しましょう。
> var s7 = LazyS<int>.Range(1, 4); > var s8 = LazyS<int>.Range(5, 8); > var s9 = s7.StreamAppend(s8); > foreach(int x in s9.StreamTake(8)) { * Console.Write("{0} ", x); * } 1 2 3 4 5 6 7 8 > foreach(int x in s7.Interleave(s8).StreamTake(8)) { * Console.Write("{0} ", x); * } 1 5 2 6 3 7 4 8
StreamAppend() の場合、無限ストリームを連結することはできませんが、Interleave() ならば無限ストリームにも対応することができます。簡単な例を示しましょう。
リスト : 1 と 2 の無限数列 public static readonly LazyS<int> ones = new LazyS<int>(1, () => ones); public static readonly LazyS<int> twos = new LazyS<int>(2, () => twos);
ones は 1 を無限に出力するストリームで、twos は 2 を無限に出力するストリームです。どちらもクラス LazyS<T> の中で定義します。StreamAppend() で ones と twos を連結しても無限に 1 を出力するだけですが、Interleave() で ones と twos を連結すれば、1 と 2 を交互に出力することができます。これで無限ストリームの要素を混ぜ合わせることができます。
簡単な実行例を示します。
> foreach(int x in LazyS<int>.ones.StreamTake(8)) { * Console.Write("{0} ", x); * } 1 1 1 1 1 1 1 1 > foreach(int x in LazyS<int>.twos.StreamTake(8)) { * Console.Write("{0} ", x); * } 2 2 2 2 2 2 2 2 > foreach(int x in LazyS<int>.ones.Interleave(LazyS<int>.twos).StreamTake(10)) { * Console.Write("{0} ", x); * } 1 2 1 2 1 2 1 2 1 2
正常に動作していますね。
ところで、今回の遅延ストリームは Delay<T> を使っているので、評価値のキャッシングが行われますが、それをしなくてもよければ、C# のイテレータや LINQ を使って遅延ストリームと同様な動作を実現することができます。
イテレータ (yield return) で値を返す場合、返り値の型は IEnumerable, IEnumerable<T>, IEnumerator, IEnumerator<T> のいずれかであることが必要ですが、LINQ といっしょに使う場合は IEnumerable<T> にすると上手くいきます。
簡単な実行例を示します。
// // lazy.csx : イテレータと LINQ による遅延ストリーム // // Copyright (C) 2016-2022 Makoto Hiroi // using System.Linq; using System.Collections; using System.Collections.Generic; // Enumerable.Range() があるので不要 IEnumerable<int> Range(int start, int count) { while (count-- > 0) { yield return start++; } } IEnumerable<long> Fibonacci() { long a = 0, b = 1; while (true) { yield return a; long c = a; a = b; b += c; } } IEnumerable<int> Sieve(IEnumerable<int> s) { while (true) { int p = s.ElementAt(0); yield return p; s = s.Skip(1).Where(n => n % p != 0); } } void test() { var s1 = Range(1, 10000); var s2 = s1.Select(n => n * n); foreach(int x in s2.Take(10)) { Console.Write("{0} ", x); } Console.WriteLine(""); var s3 = s1.Where(n => n % 2 == 0); foreach(int x in s3.Take(10)) { Console.Write("{0} ", x); } Console.WriteLine(""); var s4 = Fibonacci(); foreach(long x in s4.Take(20)) { Console.Write("{0} ", x); } Console.WriteLine(""); var s5 = s4.Skip(20); foreach(long x in s5.Take(20)) { Console.Write("{0} ", x); } Console.WriteLine(""); foreach(int x in Sieve(Range(2, 10000)).Take(100)) { Console.Write("{0} ", x); } Console.WriteLine(""); } // 実行 test();
$ dotnet script lazy.csx 1 4 9 16 25 36 49 64 81 100 2 4 6 8 10 12 14 16 18 20 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765 10946 17711 28657 46368 75025 121393 196418 317811 514229 832040 1346269 2178309 3524578 5702887 9227465 14930352 24157817 39088169 63245986 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499 503 509 521 523 541
// // delay.csx : 遅延評価 // // Copyright (C) 2016-2022 Makoto Hiroi // using System.Collections; using System.Collections.Generic; // 遅延評価 class Delay<T> { T Result { set; get; } bool Flag { set; get; } Func<T> Fn { get; } public Delay(Func<T> func) { Result = default(T); Flag = false; Fn = func; } public T Force() { if (!Flag) { Result = Fn(); Flag = true; } return Result; } } // 遅延ストリーム class LazyS<T> { T Car; Delay<LazyS<T>> Cdr; public LazyS(T item, Func<LazyS<T>> func) { Car = item; Cdr = new Delay<LazyS<T>>(func); } // 終端 public static readonly LazyS<T> nil = new LazyS<T>(default(T), () => nil); public bool IsEmpty() { return this == nil; } public T StreamCar() { return Car; } public LazyS<T> StreamCdr() { return Cdr.Force(); } // 整数列の生成 public static LazyS<int> Range(int start, int count) { if (count == 0) return LazyS<int>.nil; else return new LazyS<int>(start, () => Range(start + 1, count - 1)); } // フィボナッチ数列の生成 public static LazyS<long> Fibonacci(long a = 0, long b = 1) { return new LazyS<long>(a, () => Fibonacci(b, a + b)); } // 1 と 2 の無限数列 public static readonly LazyS<int> ones = new LazyS<int>(1, () => ones); public static readonly LazyS<int> twos = new LazyS<int>(2, () => twos); // 参照 public T StreamRef(int n) { var xs = this; while (n-- > 0 && xs != nil) { xs = xs.StreamCdr(); } return xs.StreamCar(); } // 要素を取り出す public List<T> StreamTake(int n) { var ys = new List<T>(); var xs = this; while (n-- > 0 && xs != nil) { ys.Add(xs.StreamCar()); xs = xs.StreamCdr(); } return ys; } // 要素を取り除く public LazyS<T> StreamDrop(int n) { var xs = this; while (n-- > 0 && xs != nil) { xs = xs.StreamCdr(); } return xs; } // 高階関数 // マッピング public LazyS<U> StreamMap<U>(Func<T, U> func) { if (this == nil) return LazyS<U>.nil; else return new LazyS<U>(func(StreamCar()), () => StreamCdr().StreamMap(func)); } // フィルター public LazyS<T> StreamFilter(Func<T, bool> pred) { if (this == nil) return nil; else if (pred(StreamCar())) return new LazyS<T>(StreamCar(), () => StreamCdr().StreamFilter(pred)); else return StreamCdr().StreamFilter(pred); } // 畳み込み public U StreamFoldLeft<U>(Func<U, T, U> func, U a) { var xs = this; while (xs != nil) { a = func(a, xs.StreamCar()); xs = xs.StreamCdr(); } return a; } public U StreamFoldRight<U>(Func<T, U, U> func, U a) { if (this == nil) return a; else return func(StreamCar(), StreamCdr().StreamFoldRight(func, a)); } // 連結 public LazyS<T> StreamAppend(LazyS<T> xs) { if (this == nil) return xs; else return new LazyS<T>(StreamCar(), () => StreamCdr().StreamAppend(xs)); } public LazyS<T> Interleave(LazyS<T> xs) { if (this == nil) return xs; else return new LazyS<T>(StreamCar(), () => xs.Interleave(StreamCdr())); } }
// // delaytest.csx : 遅延ストリームの簡単なテスト // // Copyright (C) 2016-2022 Makoto Hiroi // #load "delay.csx" // 素数 LazyS<int> Sieve(LazyS<int> s) { int p = s.StreamCar(); return new LazyS<int>(p, () => Sieve(s.StreamCdr().StreamFilter(n => n % p != 0))); } void test() { var s1 = LazyS<int>.Range(1, 10); while (!s1.IsEmpty()) { Console.WriteLine("{0}", s1.StreamCar()); s1 = s1.StreamCdr(); } var s2 = LazyS<long>.Fibonacci(); for (int i = 0; i < 20; i++) { Console.Write("{0} ", s2.StreamRef(i)); } foreach(long x in s2.StreamDrop(20).StreamTake(10)) { Console.WriteLine("{0}", x); } var s3 = LazyS<int>.Range(1, 100); var s4 = s3.StreamMap(n => n * n); foreach(int x in s4.StreamTake(10)) { Console.Write("{0} ", x); } Console.WriteLine(""); var s5 = s3.StreamFilter(n => n % 2 == 0); foreach(int x in s5.StreamTake(10)) { Console.Write("{0} ", x); } Console.WriteLine(""); Console.WriteLine("{0}", s3.StreamFoldLeft((sum, n) => sum + n, 0)); Console.WriteLine("{0}", s3.StreamFoldRight((n, sum) => sum + n, 0)); var s6 = Sieve(LazyS<int>.Range(2, 10000)); foreach(int x in s6.StreamTake(100)) { Console.Write("{0} ", x); } Console.WriteLine(""); var s7 = LazyS<int>.Range(1, 4); var s8 = LazyS<int>.Range(5, 8); var s9 = s7.StreamAppend(s8); foreach(int x in s9.StreamTake(8)) { Console.Write("{0} ", x); } Console.WriteLine(""); foreach(int x in s7.Interleave(s8).StreamTake(8)) { Console.Write("{0} ", x); } Console.WriteLine(""); foreach(int x in LazyS<int>.ones.StreamTake(8)) { Console.Write("{0} ", x); } Console.WriteLine(""); foreach(int x in LazyS<int>.twos.StreamTake(8)) { Console.Write("{0} ", x); } Console.WriteLine(""); foreach(int x in LazyS<int>.ones.Interleave(LazyS<int>.twos).StreamTake(10)) { Console.Write("{0} ", x); } Console.WriteLine(""); } // 実行 test();
$ dotnet script delaytest.csx 1 2 3 4 5 6 7 8 9 10 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765 10946 17711 28657 46368 75025 121393 196418 317811 514229 1 4 9 16 25 36 49 64 81 100 2 4 6 8 10 12 14 16 18 20 5050 5050 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499 503 509 521 523 541 1 2 3 4 5 6 7 8 1 5 2 6 3 7 4 8 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 1 2 1 2 1 2 1 2 1 2