今回は拙作のページ Scheme Programming 「遅延ストリーム (1), (2)」で取り上げた遅延ストリームを Haskell でプログラムしてみましょう。Haskell はデフォルトで「遅延評価」を行う処理系なので、「リスト」をそのまま遅延ストリームとして扱うことができます。
たとえば、n から始まる整数列を生成する場合、 Haskell は [n .. ] で実現できますが、次のように定義することもできます。
リスト : 整数列の生成 intgen :: Integer -> [Integer] intgen n = n : intgen (n + 1)
関数 intgen を再帰呼び出しして、その返り値 (リスト) に整数 n を追加します。リストを生成する簡単なプログラムですが、再帰呼び出しの停止条件がないため、正格な評価を行うプログラミング言語では動作しません。ところが、Haskell のように遅延評価を行う処理系では、必要なときがくるまで intgen の評価は遅延されます。たとえば、関数 take で要素を 10 個取り出す場合、intgen は (10 - 1) 回だけしか評価されません。
それでは実行してみましょう。
ghci> :t take take :: Int -> [a] -> [a] ghci> take 3 [1,2,3,4,5,6] [1,2,3] ghci> take 0 [1,2,3,4,5,6] [] ghci> take 6 [1,2,3,4,5,6] [1,2,3,4,5,6]
ghci> take 10 (intgen 1) [1,2,3,4,5,6,7,8,9,10] ghci> take 10 (intgen 100) [100,101,102,103,104,105,106,107,108,109] ghci> nums = intgen 0 ghci> nums !! 10 10 ghci> nums !! 100 100
記号 !! は左辺のリストの要素を参照する演算子です。Haskell の場合、リストは 0 から数えます。nums は 0 から始まる整数列なので、10 番目の要素は 10 で、100 番目の要素は 100 になります。
trace を使うと、遅延評価されていることがよくわかります。次のプログラムを見てください。
リスト : 整数列の生成 (2) import Debug.Trace -- n から m までの整数列 intgen' :: Integer -> Integer -> [Integer] intgen' n m | n > m = [] | otherwise = n : trace "oops!" (intgen' (n + 1) m)
intgen' n m は n 以上 m 以下の整数列を生成します。Haskell の [n .. m] と同じですが、trace を使って intgen が評価されるときに文字列 "oops!" を画面に表示しています。それでは実行してみましょう。
ghci> intgen' 1 10 [1oops! ,2oops! ,3oops! ,4oops! ,5oops! ,6oops! ,7oops! ,8oops! ,9oops! ,10oops! ] ghci> take 5 (intgen' 1 10) [1oops! ,2oops! ,3oops! ,4oops! ,5]
intgen' 1 10 を評価すると、1 から 10 を格納したリストが生成されます。このとき、oops! が 10 回表示されていて、intgen' が 10 回評価されていることがわかります。これに対し、take 5 (intgen' 1 10) は oops! が 4 回しか評価されていません。残りの 6 回は評価されていない、つまり遅延評価されている、というわけです。
ところで、intgen' は無限リストではありませんが、Haskell では遅延ストリームとして機能します。これを次のように末尾再帰でプログラムすると、遅延ストリームにはなりません。
リスト : 整数列の生成 (3) intgen'' :: Integer -> Integer -> [Integer] intgen'' n m = iter m [] where iter m a | n > m = a | otherwise = trace "oops!" (iter (m - 1) (m:a))
局所関数 iter は末尾再帰になっていて、累積変数 a に m を追加してリストを生成しています。それでは実行してみましょう。
ghci> intgen'' 1 10 oops! oops! oops! oops! oops! oops! oops! oops! oops! oops! [1,2,3,4,5,6,7,8,9,10] ghci> take 5 (intgen'' 1 10) oops! oops! oops! oops! oops! oops! oops! oops! oops! oops! [1,2,3,4,5]
intgen'' 1 10 は oops! が 10 回表示されているので、intgen'' が 10 回呼び出されていることがわかります。これに対し、take 5 (intgen'' 1 10) も oops! が 10 回表示されいて、遅延ストリームとして動作していないことがわかります。intgen'' の場合、返り値となるリストの先頭要素は、プログラムを最後まで実行しないと求めることができません。つまり、正格な評価と同じになってしまうのです。
ところが、intgen は "要素 : 再帰呼び出し" の形になっているので、返り値のリストの先頭要素はすぐに求めることができますね。そして、次の要素が必要なるまで、intgen の評価を遅延することができます。このように、リストを遅延ストリームとして扱いたい場合は、末尾再帰ではなく次の形式でプログラムしてください。
関数 引数 ... = 要素 : 再帰呼び出し 引数 ...
今まで M.Hiroi は ML や Lisp / Scheme などでプログラムするとき、効率を重視して末尾再帰でプログラムすることが多かったのですが、Haskell のような遅延評価を行う処理系では、末尾再帰にこだわる必要はないのかもしれません。
フィボナッチ数列も簡単に作ることができます。
リスト : フィボナッチ数列の生成 fibgen :: Integer -> Integer -> [Integer] fibgen a b = a : fibgen b (a + b)
関数 fibgen の引数 a がフィボナッチ数列の最初の項で、b が次の項です。fibgen b (a + b) が評価されるたびに、フィボナッチ数列の新しい項が生成されていきます。Haskell の型 Integer は多倍長整数なので、メモリの許す限りフィボナッチ数列を生成することができます。
それでは実行してみましょう。
ghci> take 20 (fibgen 0 1) [0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181] ghci> take 40 (fibgen 0 1) [0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765,10946,17711,28657, 46368,75025,121393,196418,317811,514229,832040,1346269,2178309,3524578,5702887,9227465, 14930352,24157817,39088169,63245986] ghci> fibs = fibgen 0 1 ghci> fibs !! 0 0 ghci> fibs !! 10 55 ghci> fibs !! 100 354224848179261915075
次は 2 つの遅延ストリームを結合することを考えてみましょう。Haskell の場合、リストは演算子 ++ で結合できますが、遅延ストリーム同士の結合はできません。簡単な例を示しましょう。
ghci> ones = 1 : ones ghci> take 10 ones [1,1,1,1,1,1,1,1,1,1] ghci> twos = 2 : twos ghci> take 10 twos [2,2,2,2,2,2,2,2,2,2] ghci> take 20 (ones ++ twos) [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
変数 ones は要素が 1 の遅延ストリーム (循環リスト)、twos は要素が 2 の遅延ストリームになります。これは関数 repeat を使っても実現できます。簡単な実行例を示しましょう。
ghci> ones' = repeat 1 ghci> take 10 ones' [1,1,1,1,1,1,1,1,1,1] ghci> twos' = repeat 2 ghci> take 20 twos [2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2] ghci> take 20 (ones' ++ twos') [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
ones ++ twos で生成した新しい遅延ストリームは、けっきょく左辺のストリームの要素しか取り出すことができないのです。
このような場合、ストリームの要素を交互に出力する関数を用意すると便利です。次のリストを見てください。
リスト : 遅延ストリームの要素を交互に出力 interleave :: [a] -> [a] -> [a] interleave [] ys = ys interleave (x:xs) ys = x : interleave ys xs
関数 interleave は 2 つのリスト xs, ys を受け取ります。そして、xs の要素 x をリストに格納したら、次は ys の要素をリストに格納します。これは interleave を再帰呼び出しするとき、引数 xs と ys の順番を交換するだけです。これで xs と ys の要素を交互に出力することができます。
簡単な実行例を示しましょう。
ghci> take 20 (interleave ones twos) [1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2]
Haskell の場合、マッピングやフィルターなどの高階関数はそのまま遅延ストリームにも適用できます。ただし、畳み込み foldl と foldr を無限リストに適用するときには注意が必要です。基本的に foldl は無限リストに適用できません。foldr の場合、返り値がリストであれば無限リストにも適用することができます。簡単な例を示しましょう。
ghci> take 20 (map (*2) [1..]) [2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38,40] ghci> take 20 (filter even [1..]) [2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38,40] ghci> take 20 (foldr (:) [] [1..]) [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
なお、foldr は値を求める処理、たとえば foldr (+) 0 [1..] は当然ですが実行できません。
遅延ストリームを操作する場合、2 つの遅延ストリームを受け取るマップ関数 map2 があると便利です。プログラムは次のようになります。
リスト : マップ関数 map2 :: (a -> b -> c) -> [a] -> [b] -> [c] map2 f (x:xs) (y:ys) = f x y : map2 f xs ys map2 _ _ _ = []
リスト xs と ys からそれぞれの要素を取り出して関数 f に渡します。そして、その評価結果を map2 の返り値 (リスト) に格納します。どちらかのリストが空リストのときが再帰呼び出しの停止条件です。これで有限なリストにも無限の遅延ストリームにも対応することができます。なお、Haskell には同じ働きをする関数 zipWith が用意されています。
簡単な実行例を示しましょう。
ghci> map2 (+) [1,2,3,4,5] [6,7,8,9,10] [7,9,11,13,15] ghci> map2 (*) [1 ..] [6,7,8,9,10] [6,14,24,36,50] ghci> take 20 (map2 (*) [1 ..] [10 ..]) [10,22,36,52,70,90,112,136,162,190,220,252,286,322,360,400,442,486,532,580] ghci> zipWith (+) [1,2,3,4,5] [6,7,8,9,10] [7,9,11,13,15] ghci> zipWith (*) [1 ..] [6,7,8,9,10] [6,14,24,36,50] ghci> take 20 (zipWith (*) [1 ..] [10 ..]) [10,22,36,52,70,90,112,136,162,190,220,252,286,322,360,400,442,486,532,580]
map2 (zipWith) を使うと、遅延ストリームに対していろいろな処理を定義することができます。次の例を見てください。
ghci> add_stream s1 s2 = zipWith (+) s1 s2 ghci> s3 = add_stream [1 ..] [100 ..] ghci> take 20 s3 [101,103,105,107,109,111,113,115,117,119,121,123,125,127,129,131,133,135,137,139]
add_stream は s1 と s2 の要素を加算した遅延ストリームを返します。この add_stream を使うと、整数を生成する遅延ストリームは次のように定義することができます。
ghci> ints = 1 : add_stream ones ints ghci> take 20 ints [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20] ghci> fibs = 0 : 1 : add_stream (tail fibs) fibs ghci> take 20 fibs [0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181]
ストリーム ints は、現在の ints に 1 を足し算することで整数を生成しています。fibs は現在のフィボナッチ数列を表していて、tail fibs で次の要素を求め、それらを足し算することで、その次の要素を求めています。この場合、ストリームの初期値として 2 つの要素が必要になることに注意してください。
次は、2 つのストリームからその要素の組み合わせを生成するストリームを作りましょう。要素が n 個のストリームの場合、組み合わせは n * n 個あります。次の図を見てください。
(a0, b0) (a0, b1) (a0, b2) ... (a0, bn) (a1, b0) (a1, b1) (a1, b2) ... (a1, bn) (a2, b0) (a2, b1) (a2, b2) ... (a2, bn) ... (an, b0) (an, b1) (an, b2) ... (an, bn) 図 : n * n 個の組
これは「直積集合」を求めることと同じです。Haskell の場合、リスト内包表記を使って次のようにプログラムできます。
リスト : 組の生成 (1) pair_stream :: [a] -> [b] -> [(a, b)] pair_stream xs ys = [(x, y) | x <- xs, y <- ys]
実行例を示します。
ghci> pair_stream [1..4] [5..8] [(1,5),(1,6),(1,7),(1,8),(2,5),(2,6),(2,7),(2,8),(3,5),(3,6),(3,7),(3,8),(4,5), (4,6),(4,7),(4,8)]
ところが、この方法では無限リストに対応できません。実際、ys に無限リストを渡した場合、xs の最初の要素を a0 とすると (a0, ys の要素) という組しか生成されません。実際に試してみましょう。
ghci> take 10 (pair_stream [1..4] [5..]) [(1,5),(1,6),(1,7),(1,8),(1,9),(1,10),(1,11),(1,12),(1,13),(1,14)]
そこで、下図に示すように、対角線上に組を生成していくことにします。
| a0 a1 a2 a3 a4 a5 ---+----------------------------- b0 | 0 1 3 6 10 15 ... | b1 | 2 4 7 11 16 ... | b2 | 5 8 12 17 ... | b3 | 9 13 18 ... | b4 | 14 19 ... | b5 | 20 ... | | ... | 図 : 無限ストリームによる組の生成
図を見ればおわかりのように、対角線の要素数を n とすると、組は (an-1, b0), (an-2, b1), ..., (a1, bn-2), (a0, bn-1) となっています。これは、xs から n 個の要素を取り出したリストと、ys から n 個の要素を取り出して反転したリストを zip でまとめた形になっています。プログラムは次のようになります。
リスト : 組の生成 (2) pair_stream' :: [a] -> [b] -> [(a, b)] pair_stream' xs ys = makePair 1 xs ys where makePair n xs ys = zip (take n xs) (reverse (take n ys)) ++ makePair (n + 1) xs ys
実際の処理は局所関数 makePair で行っています。引数 n が対角線上の要素数を表します。take で xs と ys から要素を取り出し、ys から取り出したリストを reverse で反転してから zip でタプルに格納します。そして、再帰呼び出しした makePair の返り値にそのリストを連結します。これで無限リストに対応することができます。
それでは実行してみましょう。
ghci> take 50 (pair_stream' [1..] [1..]) [(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1),(1,5),(2,4),(3,3), (4,2),(5,1),(1,6),(2,5),(3,4),(4,3),(5,2),(6,1),(1,7),(2,6),(3,5),(4,4),(5,3), (6,2),(7,1),(1,8),(2,7),(3,6),(4,5),(5,4),(6,3),(7,2),(8,1),(1,9),(2,8),(3,7), (4,6),(5,5),(6,4),(7,3),(8,2),(9,1),(1,10),(2,9),(3,8),(4,7),(5,6)] ghci> s1 = pair_stream' [1..] [1..] ghci> s1 !! 10 (1,5) ghci> s1 !! 49 (5,6)
正常に動作していますね。
GHC のモジュール Data.List には、リストを集合とみなして、和集合と積集合を求める関数 union と intersect が用意されています。簡単な使用例を示します。
ghci> :m Data.List ghci> :t union union :: Eq a => [a] -> [a] -> [a] ghci> :t intersect intersect :: Eq a => [a] -> [a] -> [a] ghci> union [1,2,3,4] [3,4,5,6] [1,2,3,4,5,6] ghci> union [] [3,4,5,6] [3,4,5,6] ghci> union [1,2,3,4] [] [1,2,3,4] ghci> union [1,2,3,4] [5,6,7,8] [1,2,3,4,5,6,7,8] ghci> intersect [1,2,3,4] [3,4,5,6] [3,4] ghci> intersect [] [3,4,5,6] [] ghci> intersect [1,2,3,4] [] [] ghci> intersect [1,2,3,4] [5,6,7,8] []
union と intersect は無限ストリームに対応していませんが、無限ストリームには重複要素が存在せず、要素は昇順に出力されることを前提にすると、無限ストリームでも集合演算を行うことができます。次のリストを見てください。
リスト : 集合演算 -- 和集合 union' :: Ord a => [a] -> [a] -> [a] union' [] ys = ys union' xs [] = xs union' xs1@(x:xs) ys1@(y:ys) | x == y = x : union' xs ys | x < y = x : union' xs ys1 | otherwise = y : union' xs1 ys -- 積集合 intersect' :: Ord a => [a] -> [a] -> [a] intersect' [] _ = [] intersect' _ [] = [] intersect' xs1@(x:xs) ys1@(y:ys) | x == y = x : intersect' xs ys | x < y = intersect' xs ys1 | otherwise = intersect' xs1 ys
union' は引数のリストから要素 x, y を取り出して、小さいほうをリストに追加します。等しい場合は要素をひとつだけ追加します。このとき、両方の引数から先頭要素を取り除くことに注意してください。
intersect’ も簡単です。リストの先頭要素を比較して、等しい場合はその要素をリストに追加します。x が y よりも小さい場合は、x を取り除いて次の要素を調べます。y が小さい場合は y を取り除いて次の要素を調べます。
簡単な実行例を示しましょう。
ghci> s1 = map (\x -> x * (x + 1) `div` 2) [1..] ghci> s2 = map (^2) [1..] ghci> take 10 s1 [1,3,6,10,15,21,28,36,45,55] ghci> take 10 s2 [1,4,9,16,25,36,49,64,81,100] ghci> take 20 $ union' s1 s2 [1,3,4,6,9,10,15,16,21,25,28,36,45,49,55,64,66,78,81,91] ghci> take 6 $ intersect' s1 s2 [1,36,1225,41616,1413721,48024900]
遅延ストリーム s1 は「三角数」、s2 は「四角数」を表します。これらの遅延ストリームを union' でまとめると、三角数または四角数の数列になります。intersect' でまとめると、三角数かつ四角数の数列 (平方三角数) になります。平方三角数は拙作のページ Puzzle DE Progamming 「多角数」でも取り上げています。興味のある方はお読みくださいませ。
ここで union' を使うと簡単に解ける問題を紹介しましょう。
7 以上の素数で割り切れない正の整数は、素因子が 2, 3, 5 しかない自然数のことで、これを「ハミング数 (Hamming Numbers)」といいます。ハミング数は素因数分解したとき、2i * 3j * 5k (i, j, k >= 0) の形式になります。たとえば、100 以下のハミング数は次のようになります。
1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 27, 30, 32, 36, 40, 45, 48, 50, 54, 60, 64, 72, 75, 80, 81, 90, 96, 100
遅延ストリームを使うと「ハミングの問題」は簡単に解くことができます。小さい順にハミング数を出力する遅延ストリームを hs としましょう。hs は 1 から始まるので次のように定義できます。
hs = 1 : ...
最初の要素は 1 なので、それに 2, 3, 5 を掛け算した値 (2, 3, 5) もハミング数になります。この値は次の式で生成することができます。
(map (*2) hs) (map (*3) hs) (map (*5) hs)
あとは、これらの遅延ストリームを union' でひとつにまとめれば、ハミング数を小さい順に出力することができます。
実行結果を示します。
ghci> hs = 1 : union' (map (*2) hs) (union' (map (*3) hs) (map (*5) hs)) ghci> take 100 hs [1,2,3,4,5,6,8,9,10,12,15,16,18,20,24,25,27,30,32,36,40,45,48,50,54,60,64,72,75, 80,81,90,96,100,108,120,125,128,135,144,150,160,162,180,192,200,216,225,240,243, 250,256,270,288,300,320,324,360,375,384,400,405,432,450,480,486,500,512,540,576, 600,625,640,648,675,720,729,750,768,800,810,864,900,960,972,1000,1024,1080,1125, 1152,1200,1215,1250,1280,1296,1350,1440,1458,1500,1536]
次は無限の素数列を生成するプログラムを作ってみましょう。整数 n が素数か確かめる簡単な方法は、√n 以下の素数で割り切れるか試してみることです。割り切れる素数があれば、n は素数ではありません。そうでなければ、n は素数であることがわかります。
これをそのままプログラムすると次のようになります。
リスト : 素数列の生成 checkPrime :: Integer -> Bool checkPrime n = all (\x -> mod n x /= 0) (takeWhile (\x -> x * x <= n) primes) primesFrom :: Integer -> [Integer] primesFrom n | checkPrime n = n : primesFrom (n + 2) | otherwise = primesFrom (n + 2) primes = 2 : 3 : 5 : primesFrom 7
変数 primes は無限の素数列を表します。実際に素数を生成する処理は関数 primesFrom で行います。primesFrom は関数 checkPrime を呼び出して n が素数かチェックします。素数であれば primsFrom を再帰呼び出しして、その返り値に n を追加します。そうでなければ primesFrom を再帰呼び出しするだけです。偶数は素数ではないので、引数 n には奇数を与えていることに注意してください。
checkPrime も簡単です。takeWhile で primes から √n 以下の素数列を取り出します。√n 以下の素数は生成済みなので、primes から takeWhile で取り出すことが可能です。ここでは√n のかわりに条件を x * x <= n としています。あとは、関数 all を使って、取り出した素数で n が割り切れないことを確認するだけです。
それでは実行してみましょう。
ghci> take 25 primes [2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97] ghci> primes !! 99 541 ghci> primes !! 500 3581
100 以下の素数は全部で 25 個あります。また、100 番目の素数は 541 になります。Haskell のリストは 0 から数えるので、primes !! 99 で 100 番目の素数になります。
もうひとつ素数列を生成する簡単な方法を紹介しましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するストリームを用意します。2 は素数なので、素数ストリームの要素になります。次に、この整数列から 2 で割り切れる整数を取り除き除きます。これは filter を使うと簡単です。
2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これも filter を使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くように filter を設定すればいいわけです。
このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番に fiter で設定して素数でない整数をふるい落としていくわけです。
プログラムは次のようになります。リスト : 素数列の生成 primes' = 2 : sieve [3, 5 ..] where sieve (x:xs) = x : sieve (filter (\y -> y `mod` x /= 0) xs)
primes' は素数列を表します。先頭が 2 で、その後の素数列を局所関数 sieve で生成します。sieve には 3 から始まる奇数列を渡します。2 から始まる整数列を渡すよりも、このほうが効率的です。
sieve を再帰呼び出しするとき、filter でリストから先頭要素 x で割り切れる数を filter で取り除きます。最初に奇数列から 3 で割り切れる整数を取り除いたリストが返されます。次の要素 5 を取り出すとき、このリストに対して 5 で割り切れる整数を取り除くことになるので、3 と 5 で割り切れる整数が取り除かれることになります。
次の要素は 7 になりますが、そのストリームからさらに 7 で割り切れる整数が filter で取り除かれることになります。このように filter を重ねて設定していくことで、素数でない整数をふるい落としていくことができるわけです。
それでは実行してみましょう。
ghci> take 25 primes' [2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97] ghci> primes' !! 99 541
正常に動作していますね。ところで、primes' の実行速度は primes よりもかなり遅いです。興味のある方は primes' の高速化に挑戦してみてください。
素数列を生成するプログラムは「Gauche ユーザリファレンス: 6.19 遅延評価」を参考にさせていただきました。Shiro Kawai さんに感謝いたします。