M.Hiroi's Home Page

Functional Programming

お気楽 Haskell プログラミング入門

[ PrevPage | Haskell | NextPage ]

遅延評価

今回は Haskell の特徴である「遅延評価 (delayed evaluation または lazy evaluation)」について説明します。

●Haskell の遅延評価

一般的なプログラミング言語の場合、関数を呼び出す前に引数が評価され、その結果が関数に渡されます。これを「正格 (strict) な評価」といいます。Haskell は正格評価ではありません。関数の引数は、その値が必要になるまで評価は行われません。具体的には、引数を参照するときに評価が行われます。これを「遅延評価」といいます。遅延評価は関数の引数を評価するときだけではなく、変数の値を評価するときにも行われます。

他のプログラミング言語、たとえば Scheme では delay と force を使って遅延評価を行うことができます。Scheme の場合、評価結果は保存 (キャッシュ) されるので、再度引数 (変数) を参照すると、保存されている値が返されます。GHC の場合、昔のバージョンでは Scheme と同様に評価結果をキャッシュしていたのですが、今のバージョン (ver 8.8 4) ではキャッシュされません。

簡単な例を示しましょう。

Prelude> :{
Prelude| fibo 0 = 0
Prelude| fibo 1 = 1
Prelude| fibo n = fibo (n - 1) + fibo (n - 2)
Prelude| :}
Prelude> :set +s
Prelude> fibo 30
832040
(2.86 secs, 781,575,904 bytes)
Prelude> a = fibo 30
(0.00 secs, 30,944 bytes)
Prelude> a
832040
(2.84 secs, 781,575,856 bytes)
Prelude> a
832040
(2.86 secs, 781,575,856 bytes)

関数 fibo はフィボナッチ数を返します。fibo は二重再帰になっているので、実行時間はとても遅いです。ghci で実行時間を計測するときは、コマンド :set で +s を指定してください。実行時間と使用したメモリを表示します。取り消す場合はコマンド :unset を使います。fibo 30 を評価すると 3 秒ほど時間がかかります。

次に a = fibo 30 を評価します。Haskell は遅延評価なので、変数 a を参照するまで fibo 30 は評価されません。すぐにプロンプトが表示されます。その次に変数 a を参照します。このとき、fibo 30 が評価されるので、a の値を表示するのに約 3 秒かかります。再度 a を参照すると、a の値を表示するのに同じ時間がかかるので、ghci (ver 8.8.4) は評価結果をキャッシュしていないことがわかります。

なお、これは GHC ver 8.4.4 での動作です。Haskell には複数の実装があるので、評価結果をキャッシュする処理系があるかもしれません。ご注意くださいませ。

●たらいまわし関数

それでは、遅延評価の簡単な例題として「たらいまわし関数」を取り上げます。次のリストを見てください。

リスト : たらいまわし関数

tarai :: Int -> Int -> Int -> Int
tarai x y z =
  if x <= y then y
  else tarai (tarai (x - 1) y z) (tarai (y - 1) z x) (tarai (z - 1) x y)

tak :: Int -> Int -> Int -> Int
tak x y z =
  if x <= y then z
  else tak (tak (x - 1) y z) (tak (y - 1) z x) (tak (z - 1) x y)

関数 tarai や tak は「たらいまわし関数」といって、再帰的に定義されています。これらの関数は、引数の与え方によっては実行に時間がかかるため、Lisp などのベンチマークに利用されることがあります。

関数 tarai は通称「竹内関数」と呼ばれていて、日本の代表的な Lisper である竹内郁雄先生によって考案されたそうです。そして、関数 tak は関数 tarai のバリエーションで、John Macarthy 先生によって作成されたそうです。たらいまわし関数が Lisp のベンチマークで使われていたことは知っていましたが、このような由緒ある関数だとは思ってもいませんでした。

関数 tarai は遅延評価を行う処理系では高速に実行できることが知られています。正格な評価を行うプログラミング言語では時間がかかっても、Haskell ではあっというまに計算することができます。

tarai のプログラムを見てください。x <= y のときに y を返しますが、このとき引数 z の値は必要ありませんね。Haskell は x > y のときにだけ引数 z を評価するので、無駄な計算を省略することができます。なお、関数 tak は x <= y のときに z を返しているため、遅延評価で高速化することはできません。ご注意ください。

実行時間を比較するため、x <= y のときにも引数 z を評価する関数 tarai' を作ります。次のリストを見てください。

リスト : たらいまわし関数 (2)

tarai' :: Int ->  Int -> Int -> Int
tarai' x y z =
  if x <= y then z `seq` y
  else tarai' (tarai' (x - 1) y z) (tarai' (y - 1) z x) (tarai' (z - 1) x y)

関数 seq x y は x を評価してから y を評価する関数です。返り値は y の評価結果です。x `seq` y `seq` z とすると、x, y, z の順番で評価し、最後に評価した z の結果を返します。

簡単な実行例を示します。

Prelude>:m Debug.Trace
Prelude Debug.Trace> trace "first" 10 `seq` trace "second" 20 `seq` trace "third" 30
first
second
third
30

Haskell のモジュール Debug.Trace にある関数 trace を使うと、seq の引数が順番に評価されていることがよくわかると思います。trace は第 2 引数の式を評価して返しますが、このとき第 1 引数の文字列を画面に表示します。:m はモジュールをロードする ghci のコマンドです。

tarai' は x <= y のときに z `seq` y で引数 z を評価して無駄な計算を行っているので、実行時間はとても遅くなるはずです。それでは実行してみましょう。

Prelude> :l tarai.hs
[1 of 1] Compiling Main             ( tarai.hs, interpreted )
Ok, one module loaded.
*Main> :set +s
*Main> tarai 12 6 0
12
(0.01 secs, 89,040 bytes)
*Main> tarai' 12 6 0
12
(8.19 secs, 2,445,409,256 bytes)

実行環境 : Ubunts 18.04 (WSL), Intel Core i5-6200U 2.30GHz

結果は一目瞭然で、tarai' は約 8.2 秒かかりますが、tarai は瞬時に計算を終えてしまいます。なお、これはインタプリタ ghci での結果であり、コンパイルするともっと高速になるでしょう。興味のある方はいろいろ試してみてください。

●末尾再帰

再帰定義のなかで、処理の最後で再帰呼び出しを行う場合を「末尾再帰 (tail recursion)」といいます。英語では tail recursion ですが、日本語では末尾再帰のほかに末端再帰とか終端再帰と呼ぶことがあります。末尾再帰は簡単な処理で繰り返しに変換できることが知られています。

ML や Lisp などの関数型言語や論理型言語の Prolog では、プログラムをコンパイルもしくは実行するときに、末尾再帰を繰り返しに変換する処理系があります。この機能を「末尾再帰最適化」[*1] といいます。なかには Scheme のように、言語仕様に末尾再帰最適化を行うことを明記しているプログラミング言語もあります。

たとえば、階乗を計算する関数 fact を思い出してください。

リスト : 階乗

fact :: Integer -> Integer
fact 0 = 1
fact n = n * fact (n - 1)

上記リストの fact は最後に n と fact の返り値を乗算しているので、このプログラムは末尾再帰ではありません。これを末尾再帰に変換すると次のようになります。

リスト : 階乗 (末尾再帰)

fact :: Integer -> Integer
fact n = facti n 1 where
  facti 0 a = a
  facti n a = facti (n - 1) (a * n)

局所関数 facti を見てください。最後の再帰呼び出しで、facti の返り値をそのまま返しているので、このプログラムは末尾再帰になっています。これで階乗を計算できるなんて、ちょっと不思議に思われるかもしれません。そこが再帰呼び出しの面白いところです。このプログラムでは引数 a の使い方がポイントです。

たとえば facti 4 1 を実行すると、このプログラムでは 4 * 3 * 2 * 1 を計算しています。このとき、計算の途中経過を変数 a に記憶しているのです。正格評価を行う処理系で、末尾再帰最適化を適用する前の facti の呼び出しを図に示すと、次のようになります。

facti 4 1
  facti 3 4
    facti 2 12 
      facti 1 24
        facti 0 24
        => a の値 24 を返す
      => 24
    => 24
  => 24
=> 24

変数 a には計算途中の値が格納されていることがわかります。このような変数を「累算変数」とか「累算器」といいます。純粋な関数型言語では、while や loop など繰り返しの構文が用意されていない言語があります。Haskell もそうですが、論理型言語の Prolog にも単純な繰り返しはありません。これらのプログラミング言語では、繰り返しのかわりに末尾再帰を使ってプログラミングを行い、末尾再帰最適化によりプログラムを高速に実行することができます。

ただし、Haskell は遅延評価を行う処理系なので、上図のような動作をするとは限りません。これはあとで詳しく説明します。

-- note --------
[*1] 末尾再帰最適化は一般的な呼び方で、厳密には「末尾呼び出し最適化」とか「末尾最適化」といいます。詳しい説明は拙作のページ Scheme Programming 関数型電卓プログラム fncalc の作成 (4) 末尾再帰とは? をお読みください。

●フィボナッチ関数の高速化

今度は累算変数を使って、二重再帰を末尾再帰へ変換してみましょう。例題としてフィボナッチ関数を取り上げます。再帰定義 で作成したフィボナッチ関数は二重再帰になっているので、とても時間がかかります。二重再帰を末尾再帰に変換すると、プログラムを高速に実行することができます。次のリストを見てください。

リスト : フィボナッチ関数 (末尾再帰)

fibo :: Integer -> Integer
fibo n = fiboi n 0 1 where
  fiboi 0 a _  = a
  fiboi n a b = fiboi (n - 1) b (a + b)

局所関数 fiboi の累算変数 a と b の使い方がポイントです。現在のフィボナッチ数を変数 a に、ひとつ先の値を変数 b に格納しておきます。あとは a と b を足し算して、新しいフィボナッチ数を計算すればいいわけです。fiboi の呼び出しを図に示すと、次のようになります。

fiboi 5 0 1
  fiboi 4 1 1
    fiboi 3 1 2
      fiboi 2 2 3
        fiboi 1 3 5
          fiboi 0 5 8
          => a の値 5 を返す
        => 5
      => 5
    => 5
  => 5
=> 5

図 : 関数 fiboi の呼び出し

二重再帰では、同じ値を何回も求めていたため効率がとても悪かったのですが、このプログラムでは無駄な計算を行っていないので、値を高速に求めることができます。

●末尾再帰と正格評価

Haskell は遅延評価を行う処理系なので、「末尾再帰」でプログラムしたからといって、正格評価を行う処理系と同じ動作になるとは限りません。たとえば、整数 1 から n までの合計値を求める関数 sum' を作ってみましょう。

リスト : 1 から n までの合計値を求める

sum' :: Integer -> Integer
sum' 0 = 0
sum' n = n + sum' (n - 1)

-- 末尾再帰
sumi :: Integer -> Integer
sumi n = iter n 0
  where
    iter 0 a = a
    iter n a = iter (n - 1) (a + n)

どちらも簡単な関数ですが、n が大きな値になると関数 sum' はスタックオーバーフローします。

*Main> sum' 10000000
*** Exception: stack overflow

これに対し、末尾再帰でプログラムした sumi はスタックオバーフローしませんが、そのかわりにメモリスワップが発生して、実行速度が極端に遅くなります。これは Haskell の遅延評価が原因です。

累積変数 a の値は、sumi の返り値が必要となるまで評価されません。たとえば、n = 5 とすると、再帰呼び出しで (((((0 + 5) + 4) + 3) + 2) + 1) という式が組み立てられ、sumi の返り値を評価するときに、この式が計算されて合計値が求められます。次のように、trace を使うと式 (a + n) が遅延評価されていることがよくわかります。

リスト : トレース表示

import Debug.Trace

sumi :: Integer -> Integer
sumi n = iter n 0
  where
    iter 0 a = trace "oops0" a
    iter n a = iter (n - 1) (trace "oops1" (a + n))
*Main> sumi 5
oops0
oops1
oops1
oops1
oops1
oops1
15

最初に iter の返り値 a が評価され、そのことにより組み立てられていた式が順番に評価されていくことがわかると思います。この場合、組み立てた式を保存するメモリが必要になるため、末尾再帰でプログラムしても n が大きくなるとメモリを大量に消費してしまうのです。

このような場合、正格評価を行う関数 seq を使うとうまくいきます。次のリストを見てください。

リスト : 正格評価バージョン

sumi' :: Integer -> Integer
sumi' n = iter n 0
  where
    iter 0 a = a
    iter n a = a `seq` iter (n - 1) (n + a)

seq で累積変数 a を強制的に評価します。これで再帰呼び出しが行われるたびに式を計算するので、n が大きな値でもメモリを大量消費せずに計算することができます。

実際に試してみましょう。

*Main> sum' 10000000
*** Exception: stack overflow
*Main> sumi' 10000000
50000005000000

ghci での実行なので、ちょっと時間がかかりますが、値を求めることができました。なお、GHC のモジュールには正格評価を行う関数も用意されています。たとえば、畳み込みを行う関数 foldl には、正格評価を行う関数 foldl' がモジュール Data.List に用意されています。

簡単な実行例を示します。

Prelude> :m Data.List
Prelude Data.List> :set +s
Prelude Data.List> foldl (+) 0 [1 .. 1000000]
500000500000
(0.55 secs, 161,304,744 bytes)
Prelude Data.List> foldl' (+) 0 [1 .. 1000000]
500000500000
(0.09 secs, 88,073,904 bytes)

実行環境 : Ubunts 18.04 (WSL), Intel Core i5-6200U 2.30GHz

foldl よりも foldl' のほうが速いみたいですね。興味のある方はいろいろ試してみてください。


初版 2013 年 1 月 27 日
改訂 2021 年 1 月 11 日

遅延ストリーム

今回は拙作のページ Scheme Programming 遅延ストリーム (1), (2) で取り上げた「遅延ストリーム」を Haskell でプログラムしてみましょう。Haskell はデフォルトで「遅延評価」を行う処理系なので、「リスト」をそのまま遅延ストリームとして扱うことができます。

●整数列の生成

たとえば、n から始まる整数列を生成する場合、 Haskell は [n .. ] で実現できますが、次のように定義することもできます。

リスト : 整数列の生成

intgen :: Integer -> [Integer]
intgen n = n : intgen (n + 1)

関数 intgen を再帰呼び出しして、その返り値 (リスト) に整数 n を追加します。リストを生成する簡単なプログラムですが、再帰呼び出しの停止条件がないため、正格な評価を行うプログラミング言語では動作しません。ところが、Haskell のように遅延評価を行う処理系では、必要なときがくるまで intgen の評価は遅延されます。たとえば、関数 take で要素を 10 個取り出す場合、intgen は (10 - 1) 回だけしか評価されません。

それでは実行してみましょう。

*Main> :t take
take :: Int -> [a] -> [a]
*Main> take 3 [1,2,3,4,5,6]
[1,2,3]
*Main> take 0 [1,2,3,4,5,6]
[]
*Main> take 6 [1,2,3,4,5,6]
[1,2,3,4,5,6]
*Main> take 10 (intgen 1)
[1,2,3,4,5,6,7,8,9,10]
*Main> take 10 (intgen 100)
[100,101,102,103,104,105,106,107,108,109]
*Main> nums = intgen 0
*Main> nums !! 10
10
*Main> nums !! 100
100

記号 !! は左辺のリストの要素を参照する演算子です。Haskell の場合、リストは 0 から数えます。nums は 0 から始まる整数列なので、10 番目の要素は 10 で、100 番目の要素は 100 になります。

trace を使うと、遅延評価されていることがよくわかります。次のプログラムを見てください。

リスト : 整数列の生成 (2)

import Debug.Trace

-- n から m までの整数列
intgen' :: Integer -> Integer -> [Integer]
intgen' n m
  | n > m     = []
  | otherwise =  n : trace "oops!" (intgen' (n + 1) m)

intgen' n m は n 以上 m 以下の整数列を生成します。Haskell の [n .. m] と同じですが、trace を使って intgen が評価されるときに文字列 "oops!" を画面に表示しています。それでは実行してみましょう。

*Main> intgen' 1 10
[1oops!
,2oops!
,3oops!
,4oops!
,5oops!
,6oops!
,7oops!
,8oops!
,9oops!
,10oops!
]
*Main> take 5 (intgen' 1 10)
[1oops!
,2oops!
,3oops!
,4oops!
,5]

intgen' 1 10 を評価すると、1 から 10 を格納したリストが生成されます。このとき、oops! が 10 回表示されていて、intgen' が 10 回評価されていることがわかります。これに対し、take 5 (intgen' 1 10) は oops! が 4 回しか評価されていません。残りの 6 回は評価されていない、つまり遅延評価されている、というわけです。

ところで、intgen' は無限リストではありませんが、Haskell では遅延ストリームとして機能します。これを次のように末尾再帰でプログラムすると、遅延ストリームにはなりません。

リスト : 整数列の生成 (3)

intgen'' :: Integer -> Integer -> [Integer]
intgen'' n m = iter m [] where
  iter m a
     | n > m     = a
     | otherwise = trace "oops!" (iter (m - 1) (m:a))

局所関数 iter は末尾再帰になっていて、累積変数 a に m を追加してリストを生成しています。それでは実行してみましょう。

*Main> intgen'' 1 10
oops!
oops!
oops!
oops!
oops!
oops!
oops!
oops!
oops!
oops!
[1,2,3,4,5,6,7,8,9,10]
*Main> take 5 (intgen'' 1 10)
oops!
oops!
oops!
oops!
oops!
oops!
oops!
oops!
oops!
oops!
[1,2,3,4,5]

intgen'' 1 10 は oops! が 10 回表示されているので、intgen'' が 10 回呼び出されていることがわかります。これに対し、take 5 (intgen'' 1 10) も oops! が 10 回表示されいて、遅延ストリームとして動作していないことがわかります。intgen'' の場合、返り値となるリストの先頭要素は、プログラムを最後まで実行しないと求めることができません。つまり、正格な評価と同じになってしまうのです。

ところが、intgen は "要素 : 再帰呼び出し" の形になっているので、返り値のリストの先頭要素はすぐに求めることができますね。そして、次の要素が必要なるまで、intgen の評価を遅延することができます。このように、リストを遅延ストリームとして扱いたい場合は、末尾再帰ではなく次の形式でプログラムしてください。

関数 引数 ... = 要素 : 再帰呼び出し 引数 ...

今まで M.Hiroi は ML や Lisp / Scheme などでプログラムするとき、効率を重視して末尾再帰でプログラムすることが多かったのですが、Haskell のような遅延評価を行う処理系では、末尾再帰にこだわる必要はないのかもしれません。

●フィボナッチ数列の生成

フィボナッチ数列も簡単に作ることができます。

リスト : フィボナッチ数列の生成

fibgen :: Integer -> Integer -> [Integer]
fibgen a b = a : fibgen b (a + b)

関数 fibgen の引数 a がフィボナッチ数列の最初の項で、b が次の項です。fibgen b (a + b) が評価されるたびに、フィボナッチ数列の新しい項が生成されていきます。Haskell の型 Integer は多倍長整数なので、メモリの許す限りフィボナッチ数列を生成することができます。

それでは実行してみましょう。

*Main> take 20 (fibgen 0 1)
[0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181]
*Main> take 40 (fibgen 0 1)
[0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765,10946,17711,28657,
46368,75025,121393,196418,317811,514229,832040,1346269,2178309,3524578,5702887,9227465,
14930352,24157817,39088169,63245986]

*Main> fibs = fibgen 0 1
*Main> fibs !! 0
0
*Main> fibs !! 10
55
*Main> fibs !! 100
354224848179261915075

●遅延ストリームの連結

次は 2 つの遅延ストリームを結合することを考えてみましょう。Haskell の場合、リストは演算子 ++ で結合できますが、遅延ストリーム同士の結合はできません。簡単な例を示しましょう。

Prelude> ones = 1 : ones
Prelude> take 10 ones
[1,1,1,1,1,1,1,1,1,1]

Prelude> twos = 2 : twos
Prelude> take 10 twos
[2,2,2,2,2,2,2,2,2,2]

Prelude> take 20 (ones ++ twos)
[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]

変数 ones は要素が 1 の遅延ストリーム (循環リスト)、twos は要素が 2 の遅延ストリームになります。これは関数 repeat を使っても実現できます。簡単な実行例を示しましょう。

Prelude> ones' = repeat 1
Prelude> take 10 ones'
[1,1,1,1,1,1,1,1,1,1]

Prelude> twos' = repeat 2
Prelude> take 20 twos
[2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2]

Prelude> take 20 (ones' ++ twos')
[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]

ones ++ twos で生成した新しい遅延ストリームは、けっきょく左辺のストリームの要素しか取り出すことができないのです。

このような場合、ストリームの要素を交互に出力する関数を用意すると便利です。次のリストを見てください。

リスト : 遅延ストリームの要素を交互に出力

interleave :: [a] -> [a] -> [a]
interleave []     ys = ys
interleave (x:xs) ys = x : interleave ys xs

関数 interleave は 2 つのリスト xs, ys を受け取ります。そして、xs の要素 x をリストに格納したら、次は ys の要素をリストに格納します。これは interleave を再帰呼び出しするとき、引数 xs と ys の順番を交換するだけです。これで xs と ys の要素を交互に出力することができます。

簡単な実行例を示しましょう。

*Main> take 20 (interleave ones twos)
[1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2]

●高階関数

Haskell の場合、マッピングやフィルターなどの高階関数はそのまま遅延ストリームにも適用できます。ただし、畳み込み foldl と foldr を無限リストに適用するときには注意が必要です。基本的に foldl は無限リストに適用できません。foldr の場合、返り値がリストであれば無限リストにも適用することができます。簡単な例を示しましょう。

Prelude> take 20 (map (*2) [1..])
[2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38,40]

Prelude> take 20 (filter even [1..])
[2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38,40]

Prelude> take 20 (foldr (:) [] [1..])
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

なお、foldr は値を求める処理、たとえば foldr (+) 0 [1..] は当然ですが実行できません。

遅延ストリームを操作する場合、2 つの遅延ストリームを受け取るマップ関数 map2 があると便利です。プログラムは次のようになります。

リスト : マップ関数

map2 :: (a -> b -> c) -> [a] -> [b] -> [c]
map2 f (x:xs) (y:ys) = f x y : map2 f xs ys
map2 _ _      _      = []

リスト xs と ys からそれぞれの要素を取り出して関数 f に渡します。そして、その評価結果を map2 の返り値 (リスト) に格納します。どちらかのリストが空リストのときが再帰呼び出しの停止条件です。これで有限なリストにも無限の遅延ストリームにも対応することができます。なお、Haskell には同じ働きをする関数 zipWith が用意されています。

簡単な実行例を示しましょう。

*Main> map2 (+) [1,2,3,4,5] [6,7,8,9,10]
[7,9,11,13,15]
*Main> map2 (*) [1 ..] [6,7,8,9,10]
[6,14,24,36,50]
*Main> take 20 (map2 (*) [1 ..] [10 ..])
[10,22,36,52,70,90,112,136,162,190,220,252,286,322,360,400,442,486,532,580]

*Main> zipWith (+) [1,2,3,4,5] [6,7,8,9,10]
[7,9,11,13,15]
*Main> zipWith (*) [1 ..] [6,7,8,9,10]
[6,14,24,36,50]
*Main> take 20 (zipWith (*) [1 ..] [10 ..])
[10,22,36,52,70,90,112,136,162,190,220,252,286,322,360,400,442,486,532,580]

map2 (zipWith) を使うと、遅延ストリームに対していろいろな処理を定義することができます。次の例を見てください。

*Main> add_stream s1 s2 = zipWith (+) s1 s2
*Main> s3 = add_stream [1 ..] [100 ..]
*Main> take 20 s3
[101,103,105,107,109,111,113,115,117,119,121,123,125,127,129,131,133,135,137,139]

add_stream は s1 と s2 の要素を加算した遅延ストリームを返します。この add_stream を使うと、整数を生成する遅延ストリームは次のように定義することができます。

*Main> ints = 1 : add_stream ones ints
*Main> take 20 ints
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

*Main> fibs = 0 : 1 : add_stream (tail fibs) fibs
*Main> take 20 fibs
[0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181]

ストリーム ints は、現在の ints に 1 を足し算することで整数を生成しています。fibs は現在のフィボナッチ数列を表していて、tail fibs で次の要素を求め、それらを足し算することで、その次の要素を求めています。この場合、ストリームの初期値として 2 つの要素が必要になることに注意してください。

●組 (pair) を生成する遅延ストリーム

次は、2 つのストリームからその要素の組み合わせを生成するストリームを作りましょう。要素が n 個のストリームの場合、組み合わせは n * n 個あります。次の図を見てください。

(a0, b0) (a0, b1) (a0, b2) ... (a0, bn)
(a1, b0) (a1, b1) (a1, b2) ... (a1, bn)
(a2, b0) (a2, b1) (a2, b2) ... (a2, bn)

                           ...

(an, b0) (an, b1) (an, b2) ... (an, bn)


        図 : n * n 個の組

これは「直積集合」を求めることと同じです。Haskell の場合、リスト内包表記を使って次のようにプログラムできます。

リスト : 組の生成 (1)

pair_stream :: [a] -> [b] -> [(a, b)]
pair_stream xs ys = [(x, y) | x <- xs, y <- ys]

実行例を示します。

*Main> pair_stream [1..4] [5..8]
[(1,5),(1,6),(1,7),(1,8),(2,5),(2,6),(2,7),(2,8),(3,5),(3,6),(3,7),(3,8),(4,5),(4,6),(4,7),(4,8)]

ところが、この方法では無限リストに対応できません。実際、ys に無限リストを渡した場合、xs の最初の要素を a0 とすると (a0, ys の要素) という組しか生成されません。実際に試してみましょう。

*Main> take 10 (pair_stream [1..4] [5..])
[(1,5),(1,6),(1,7),(1,8),(1,9),(1,10),(1,11),(1,12),(1,13),(1,14)]

そこで、下図に示すように、対角線上に組を生成していくことにします。

   | a0  a1  a2  a3  a4  a5
---+-----------------------------
b0 | 0   1   3   6   10  15  ...
   |
b1 | 2   4   7   11  16  ...
   |
b2 | 5   8   12  17  ...
   |
b3 | 9   13  18  ...
   |
b4 | 14  19  ...
   |
b5 | 20 ...
   |
   | ...
   |


図 : 無限ストリームによる組の生成

図を見ればおわかりのように、対角線の要素数を n とすると、組は (an-1, b0), (an-2, b1), ..., (a1, bn-2), (a0, bn-1) となっています。これは、xs から n 個の要素を取り出したリストと、ys から n 個の要素を取り出して反転したリストを zip でまとめた形になっています。プログラムは次のようになります。

リスト : 組の生成 (2)

pair_stream' :: [a] -> [b] -> [(a, b)]
pair_stream' xs ys = makePair 1 xs ys where
  makePair n xs ys = zip (take n xs) (reverse (take n ys)) ++ makePair (n + 1) xs ys

実際の処理は局所関数 makePair で行っています。引数 n が対角線上の要素数を表します。take で xs と ys から要素を取り出し、ys から取り出したリストを reverse で反転してから zip でタプルに格納します。そして、再帰呼び出しした makePair の返り値にそのリストを連結します。これで無限リストに対応することができます。

それでは実行してみましょう。

*Main> take 50 (pair_stream' [1..] [1..])
[(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1),(1,5),(2,4),(3,3),
 (4,2),(5,1),(1,6),(2,5),(3,4),(4,3),(5,2),(6,1),(1,7),(2,6),(3,5),(4,4),(5,3),
 (6,2),(7,1),(1,8),(2,7),(3,6),(4,5),(5,4),(6,3),(7,2),(8,1),(1,9),(2,8),(3,7),
 (4,6),(5,5),(6,4),(7,3),(8,2),(9,1),(1,10),(2,9),(3,8),(4,7),(5,6)]

*Main> s1 = pair_stream' [1..] [1..]
*Main> s1 !! 10
(1,5)
*Main> s1 !! 49
(5,6)

正常に動作していますね。

●集合演算

GHC のモジュール Data.List には、リストを集合とみなして、和集合と積集合を求める関数 union と intersect が用意されています。簡単な使用例を示します。

Prelude> :m Data.List
Prelude Data.List> :t union
union :: Eq a => [a] -> [a] -> [a]
Prelude Data.List> :t intersect
intersect :: Eq a => [a] -> [a] -> [a]

Prelude Data.List> union [1,2,3,4] [3,4,5,6]
[1,2,3,4,5,6]
Prelude Data.List> union [] [3,4,5,6]
[3,4,5,6]
Prelude Data.List> union [1,2,3,4] []
[1,2,3,4]
Prelude Data.List> union [1,2,3,4] [5,6,7,8]
[1,2,3,4,5,6,7,8]

Prelude Data.List> intersect [1,2,3,4] [3,4,5,6]
[3,4]
Prelude Data.List> intersect [] [3,4,5,6]
[]
Prelude Data.List> intersect [1,2,3,4] []
[]
Prelude Data.List> intersect [1,2,3,4] [5,6,7,8]
[]

union と intersect は無限ストリームに対応していませんが、無限ストリームには重複要素が存在せず、要素は昇順に出力されることを前提にすると、無限ストリームでも集合演算を行うことができます。次のリストを見てください。

リスト : 集合演算

-- 和集合
union' :: Ord a => [a] -> [a] -> [a]
union' [] ys = ys
union' xs [] = xs
union' xs1@(x:xs) ys1@(y:ys)
  | x == y    = x : union' xs ys
  | x < y     = x : union' xs ys1
  | otherwise = y : union' xs1 ys

-- 積集合
intersect' :: Ord a => [a] -> [a] -> [a]
intersect' [] _  = []
intersect' _  [] = []
intersect' xs1@(x:xs) ys1@(y:ys)
  | x == y    = x : intersect' xs ys
  | x < y     = intersect' xs ys1
  | otherwise = intersect' xs1 ys

union' は引数のリストから要素 x, y を取り出して、小さいほうをリストに追加します。等しい場合は要素をひとつだけ追加します。このとき、両方の引数から先頭要素を取り除くことに注意してください。

intersect’ も簡単です。リストの先頭要素を比較して、等しい場合はその要素をリストに追加します。x が y よりも小さい場合は、x を取り除いて次の要素を調べます。y が小さい場合は y を取り除いて次の要素を調べます。

簡単な実行例を示しましょう。

*Main> s1 = map (\x -> x * (x + 1) `div` 2) [1..]
*Main> s2 = map (^2) [1..]
*Main> take 10 s1
[1,3,6,10,15,21,28,36,45,55]
*Main> take 10 s2
[1,4,9,16,25,36,49,64,81,100]

*Main> take 20 $ union' s1 s2
[1,3,4,6,9,10,15,16,21,25,28,36,45,49,55,64,66,78,81,91]

*Main> take 6 $ intersect' s1 s2
[1,36,1225,41616,1413721,48024900]

遅延ストリーム s1 は「三角数」、s2 は「四角数」を表します。これらの遅延ストリームを union' でまとめると、三角数または四角数の数列になります。intersect' でまとめると、三角数かつ四角数の数列 (平方三角数) になります。平方三角数は拙作のページ Puzzle DE Progamming 多角数 でも取り上げています。興味のある方はお読みくださいませ。

●ハミングの問題

ここで union' を使うと簡単に解ける問題を紹介しましょう。

[ハミングの問題]

7 以上の素数で割り切れない正の整数を小さい順に N 個求めよ

参考文献 : 奥村晴彦,『C言語による最新アルゴリズム事典』, 技術評論社, 1991 (361 ページより引用)

7 以上の素数で割り切れない正の整数は、素因子が 2, 3, 5 しかない自然数のことで、これを「ハミング数 (Hamming Numbers)」といいます。ハミング数は素因数分解したとき、2i * 3j * 5k (i, j, k >= 0) の形式になります。たとえば、100 以下のハミング数は次のようになります。

1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 27, 30, 32, 36, 40, 45, 48, 50, 
54, 60, 64, 72, 75, 80, 81, 90, 96, 100

遅延ストリームを使うと「ハミングの問題」は簡単に解くことができます。小さい順にハミング数を出力する遅延ストリームを hs としましょう。hs は 1 から始まるので次のように定義できます。

hs = 1 : ...

最初の要素は 1 なので、それに 2, 3, 5 を掛け算した値 (2, 3, 5) もハミング数になります。この値は次の式で生成することができます。

(map (*2) hs)
(map (*3) hs)
(map (*5) hs)

あとは、これらの遅延ストリームを union' でひとつにまとめれば、ハミング数を小さい順に出力することができます。

実行結果を示します。

*Main> hs = 1 : union' (map (*2) hs) (union' (map (*3) hs) (map (*5) hs))
*Main> take 100 hs
[1,2,3,4,5,6,8,9,10,12,15,16,18,20,24,25,27,30,32,36,40,45,48,50,54,60,64,72,75,
 80,81,90,96,100,108,120,125,128,135,144,150,160,162,180,192,200,216,225,240,243,
 250,256,270,288,300,320,324,360,375,384,400,405,432,450,480,486,500,512,540,576,
 600,625,640,648,675,720,729,750,768,800,810,864,900,960,972,1000,1024,1080,1125,
 1152,1200,1215,1250,1280,1296,1350,1440,1458,1500,1536]

●素数列の生成

次は無限の素数列を生成するプログラムを作ってみましょう。整数 n が素数か確かめる簡単な方法は、√n 以下の素数で割り切れるか試してみることです。割り切れる素数があれば、n は素数ではありません。そうでなければ、n は素数であることがわかります。

これをそのままプログラムすると次のようになります。

リスト : 素数列の生成

checkPrime :: Integer -> Bool
checkPrime n =
  all (\x -> mod n x /= 0) (takeWhile (\x -> x * x <= n) primes)

primesFrom :: Integer -> [Integer]
primesFrom n
  | checkPrime n = n : primesFrom (n + 2)
  | otherwise    = primesFrom (n + 2)

primes = 2 : 3 : 5 : primesFrom 7

変数 primes は無限の素数列を表します。実際に素数を生成する処理は関数 primesFrom で行います。primesFrom は関数 checkPrime を呼び出して n が素数かチェックします。素数であれば primsFrom を再帰呼び出しして、その返り値に n を追加します。そうでなければ primesFrom を再帰呼び出しするだけです。偶数は素数ではないので、引数 n には奇数を与えていることに注意してください。

checkPrime も簡単です。takeWhile で primes から √n 以下の素数列を取り出します。√n 以下の素数は生成済みなので、primes から takeWhile で取り出すことが可能です。ここでは√n のかわりに条件を x * x <= n としています。あとは、関数 all を使って、取り出した素数で n が割り切れないことを確認するだけです。

それでは実行してみましょう。

*Main> take 25 primes
[2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97]
*Main> primes !! 99
541
*Main> primes !! 500
3581

100 以下の素数は全部で 25 個あります。また、100 番目の素数は 541 になります。Haskell のリストは 0 から数えるので、primes !! 99 で 100 番目の素数になります。

●エラトステネスの篩

もうひとつ素数列を生成する簡単な方法を紹介しましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するストリームを用意します。2 は素数なので、素数ストリームの要素になります。次に、この整数列から 2 で割り切れる整数を取り除き除きます。これは filter を使うと簡単です。

2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これも filter を使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くように filter を設定すればいいわけです。

このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番に fiter で設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。
リスト : 素数列の生成

primes' = 2 : sieve [3, 5 ..] where
  sieve (x:xs) = x : sieve (filter (\y -> y `mod` x /= 0) xs)

primes' は素数列を表します。先頭が 2 で、その後の素数列を局所関数 sieve で生成します。sieve には 3 から始まる奇数列を渡します。2 から始まる整数列を渡すよりも、このほうが効率的です。sieve を再帰呼び出しするとき、filter でリストから先頭要素 x で割り切れる数を filter で取り除きます。最初に奇数列から 3 で割り切れる整数を取り除いたリストが返されます。次の要素 5 を取り出すとき、このリストに対して 5 で割り切れる整数を取り除くことになるので、3 と 5 で割り切れる整数が取り除かれることになります。次の要素は 7 になりますが、そのストリームからさらに 7 で割り切れる整数が filter で取り除かれることになります。

このように filter を重ねて設定していくことで、素数でない整数をふるい落としていくことができるわけです。それでは実行してみましょう。

*Main> take 25 primes'
[2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97]
*Main> primes' !! 99
541

正常に動作していますね。ところで、primes' の実行速度は primes よりもかなり遅いです。興味のある方は primes' の高速化に挑戦してみてください。

●参考 URL

素数列を生成するプログラムは Gauche ユーザリファレンス: 6.19 遅延評価 を参考にさせていただきました。Shiro Kawai さんに感謝いたします。


初版 2013 年 1 月 27 日
改訂 2021 年 1 月 11 日

Copyright (C) 2013-2021 Makoto Hiroi
All rights reserved.

[ PrevPage | Haskell | NextPage ]