M.Hiroi's Home Page

お気楽 Ruby プログラミング入門

番外編 : 継続渡しスタイル

[ PrevPage | R u b y | NextPage ]

はじめに

今回は「継続渡しスタイル」について説明します。Scheme には「継続 (continuation)」という強力な機能があります。継続を簡単に説明すると、「プログラムの実行状態を保存し、あとから実行を再開することができる機能」となります。Scheme 以外で継続を扱うことができるプログラミング言語はほとんどありませんが、SML/NJ や OCaml では拡張機能を使って「継続」を取り扱うことができます。

Ruby も継続を扱うことができる数少ない言語の一つですが、ver 1.9 からは継続を使用するために require 'continuation' が必要になりました。ver 3.0 では警告メッセージ callcc is obsolete; use Fiber instead が表示されます。継続を積極的に使用することはあまり歓迎されていないのでしょう。近い将来、継続のサポートは打ち切られるかもしれません。それでも、継続を操作できるのは他の言語にはない Ruby の特徴の一つです。継続を勉強する (または試してみる) のに Ruby を使うのはよい方法だと思います。

継続は使いこなすのが難しい機能だといわれています。継続と同様の考え方に「継続渡しスタイル (Continuation Passing Style : CPS)」があります。CPS はクロージャを使った汎用的な方法なので、クロージャをサポートしている言語であれば、CPS でプログラムを作ることが可能です。CPS も慣れるまでちょっと苦労しますが、継続よりはわかりやすいと思います。継続の前に CPS から説明することにしましょう。

●継続とは?

最初に継続について簡単に説明します。継続は「次に行われる計算」のことです。たとえば、次のプログラムを例に考えてみましょう。

irb> def foo
irb>   print "foo"
irb> end
=> :foo
irb> def bar
irb>   print "bar"
irb> end
=> :bar
irb> def baz
irb>   print "baz"
irb> end
=> :baz
irb> def test
irb>   foo
irb>   bar
irb>   baz
irb> end
=> :test
irb> test
foobarbaz=> nil

関数 test() は関数 foo(), bar(), baz() を順番に呼び出します。foo() の次に実行される処理は bar(), baz() の関数呼び出しです。この処理が foo() を呼び出したあとの「継続」になります。同様に、bar() のあとに実行されるのは baz() の呼び出しで、この処理がこの時点での「継続」になります。baz() を呼び出したあと、test() の中では次に実行する処理はありませんが、test() は関数呼び出しされているので、関数呼び出しから元に戻る処理が baz() を呼び出したあとの「継続」になります。

このように、あるプログラムを実行しているとき、そのプログラムを終了するまでには「次に実行する処理 (計算)」が必ず存在します。一般に、この処理 (計算) のことを「継続」といいます。Ruby や Scheme の場合、次の計算を続行するための情報を取り出して、それを保存することができます。Ruby や Scheme では、この保存した情報を「継続」といって、通常のデータ型と同様に取り扱うことができます。つまり、継続を変数に代入したり関数の引数に渡すことができるのです。継続を使うとプログラムの実行を途中で中断し、あとからそこに戻ってプログラムの実行を再開することができます。

●継続渡しスタイルとは?

一般のプログラミング言語では、Ruby や Scheme のように継続を取り出して保存することはできません。そこで、継続 (次に行う処理) を関数 (クロージャ) で表して、それを引数に渡して実行することにします。これを「継続渡しスタイル (CPS)」といいます。Ruby の場合はブロックを使うと簡単です。次の例を見てください。

irb> def test_cps
irb>   foo
irb>   bar
irb>   yield
irb> end
=> :test_cps
irb> test_cps do baz end
foobarbaz=> nil

関数 test_cps() は foo(), bar() を呼び出したあと、yield でブロックを実行します。関数 baz() を渡せば foo, bar, baz と表示されますし、他の処理を渡せばそれを実行することができます。

もう一つ簡単な例を示しましょう。継続に値を渡して処理を行うこともできます。

irb> def add_cps(x, y)
irb>   yield(x + y)
irb> end
=> :add_cps
irb> add_cps(1, 2) do |x| x end
=> 3
irb> add_cps(10, 20) do |x| print x end
30=> nil

関数 add_cps() は引数 a と b を加算して、その結果を継続 (ブロック) に渡します。ブロックで引数 x をそのまま返すと、計算結果を返すことができます。また、ブロックで print x を実行すれば、計算結果を表示することができます。

●再帰呼び出しと CPS

CPS を使うと再帰呼び出しを末尾再帰に変換することができます。たとえば、階乗の計算を CPS でプログラムすると次のようになります。

リスト : 階乗の計算 (CPS)

def fact_cps(n)
  if n == 0
    yield 1
  else
    fact_cps(n - 1) do |x|
      yield(n * x)
    end
  end
end

ブロックが継続を表します。n が 0 のときは、yield に階乗の値 1 を渡します。それ以外の場合は、階乗の計算を継続の処理にまかせて関数 fact_cps() を再帰呼び出します。ここで、fact_cps() の呼び出しは末尾再帰になることに注意してください。

継続の処理 do |x| yield(n * x) end では、継続の引数 x と fact_cps() の引数 n を掛け算し、その結果を yield に渡します。この継続の中で階乗の式が組み立てられていきます。n の階乗を求めるとき、継続を表すブロックの引数 x には (n - 1)! の値が渡されることに注意してください。そして、n が 0 のとき継続に引数 1 を渡して実行すると、今まで組み立てられた式が計算されて階乗の値を求めることができます。

それでは実際に実行してみましょう。

リスト : 階乗のテスト

def test_fact
  20.times do |n|
    fact_cps(n) do |x|
      print x, "\n"
    end
  end
end
irb> test_fact
1
2
6
24
120
720
5040
40320
362880
3628800
39916800
479001600
6227020800
87178291200
1307674368000
20922789888000
355687428096000
6402373705728000
121645100408832000

●二重再帰と CPS

次はフィボナッチ数列を求める関数を CPS で作りましょう。フィボナッチ関数は階乗と同様に再帰的に定義される関数です。

\( fibo(n) = \begin{cases} 0 & if \ n = 0 \\ 1 & if \ n = 1 \\ fibo(n - 1) + fibo(n - 2) \quad & if \ n \gt 1 \end{cases} \)

0, 1, 1, 2, 3, 5, 8, 13 .... という直前の 2 項を足していく数列

フィボナッチ関数も再帰呼び出しを使えば簡単にプログラムできます。次のリストを見てください。

リスト : フィボナッチ関数

# 二重再帰
def fibo(n)
  if n < 2
    n
  else
    fibo(n - 1) + fibo(n - 2)
  end
end

# CPS
def fibo_cps(n)
  if n < 2
    yield n
  else
    fibo_cps(n - 1) do |x|
      fibo_cps(n - 2) do |y|
        yield(x + y)
      end
    end
  end
end

関数 fibo_cps() は、引数 n が 0 または 1 のとき yield n を実行します。それ以外の場合は fibo_cps() を再帰呼び出しします。n - 1 の値が求まると、その値はブロックの引数 x に渡されます。この中で、今度は n - 2 の値を求めます。すると、その値はブロックの引数 y に渡されます。したがって、fibo_cps n の値は x + y で求めることができ、この値を fibo_cps n のブロック yield に渡せばいいわけです。

それでは実際に実行してみましょう。

リスト : フィボナッチ関数のテスト

def test_fibo
  20.times do |n|
    fibo_cps(n) do |x|
      print x, "\n"
    end
  end
end
irb> test_fibo
0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
=> エラー 
stack level too deep (SystemStackError)

残念ながら、単純な二重再帰 (fibo) で求めることができる値でも、fibo_cps ではエラーになってしまいました。Ruby はデフォルトでは末尾再帰最適化を行わないので、CPS との相性はあまりよくないようです。末尾再帰最適化については Appendix : 末尾再帰と繰り返し にまとめました。興味のある方はお読みくださいませ。

●CPS の便利な使い方

階乗やフィボナッチ関数の場合、CPS に変換するメリットはありませんが、場合によっては CPS の方が簡単にプログラムできることもあります。たとえば、入れ子になった配列を平坦化する関数 flatten() において、配列の要素に空の配列 [ ] が含まれていたら [ ] を返すようにプログラムすることを考えてみましょう。

最初に flatten() について簡単に説明します。入れ子になっている配列の中から要素を取り出して、それを一つの配列にまとめます。このような処理を「平坦化」とか「平滑化」といいます。Ruby のクラス Array には平坦化を行うメソッド flatten(), flatten!() が用意されています。

配列の平坦化は、二重再帰を使うと簡単にプログラムできます。次のリストを見てください。

リスト : 配列の平坦化

def flatten(a)
  if a == []
    []
  elsif not a.instance_of?(Array)
    [a]
  else
    flatten(a[0]) + flatten(a[1..-1])
  end
end

引数の配列 a が [ ] であれば [ ] を返します。次に a が配列でなければ、それを配列に格納して返します。a が配列の場合は、配列の先頭の要素を平坦化し、残りの要素を平坦化して、その結果を演算子 + で結合します。ここで、[ ] + [1, 2, 3] と [1, 2, 3] + [ ] は [1, 2, 3] になることに注意してください。したがって、配列 a の要素に [ ] があっても、それが返り値のリストに含まれることはありません。

簡単な実行例を示しましょう。

irb> flatten([1, [2, [3, 4, 5], 6, 7], 8, 9])
=> [1, 2, 3, 4, 5, 6, 7, 8, 9]
irb> flatten([1, [2, [3, 4, [], 5], 6, 7], 8, 9])
=> [1, 2, 3, 4, 5, 6, 7, 8, 9]

2 番目の例のように、flatten() は [ ] を取り除く動作になります。

それでは、配列の要素に [ ] があれば、[ ] を返すように flatten() を修正してみましょう。つまり、2 番目の例で flatten() の返り値は [ ] になります。次のリストを見てください。

リスト : flatten の修正 (間違い)

def flatten1(a)
  if a == []
    []
  elsif not a.instance_of?(Array)
    [a]
  elsif a[0] == []
    []
  else
    flatten1(a[0]) + flatten1(a[1..-1])
  end
end

関数 flatten1() は a[0] が [ ] ならば [ ] を返していますが、これでは正常に動作しません。実際に試してみると次のようになります。

irb> flatten1([1, [2, [], [3, 4, 5], 6, 7], 8, 9])
=> [1, 2, 8, 9]

この場合、[ ] を返したいのですが、その前の要素 1, 2 を連結した配列を返し、そのあとの処理も行っています。[ ] を見つける前に配列の連結処理を行っているので、[ ] を見つけたら処理を破棄して、ただちに [ ] を返さないといけないのです。

このような場合、CPS でプログラムすると簡単です。次のリストを見てください。

リスト : 配列の平坦化 (CPS)

# flatten の CPS 化
def flatten_cps(a)
  if a == []
    yield []
  elsif not a.instance_of?(Array)
    yield [a]
  else
    flatten_cps(a[0]) do |x|
      flatten_cps(a[1..-1]) do |y|
        yield(x + y)
      end
    end
  end
end

# flatten1 の CPS 化
def flatten_cps1(a)
  if a == []
    yield []
  elsif not a.instance_of?(Array)
    yield [a]
  elsif a[0] == []
    []
  else
    flatten_cps1(a[0]) do |x|
      flatten_cps1(a[1..-1]) do |y|
        yield(x + y)
      end
    end
  end
end

flatten() を CPS に変換するのは簡単です。引数 a が [ ] の場合は yield [ ] を、a が配列でなければ yield [a] を実行します。次に、flatten_cps() を再帰呼び出しして a[0] を平坦化します。その結果は継続の引数 x に渡されます。その継続の中で flatten_cps() を再帰呼び出しして残りの配列を平坦化し、その結果を継続の引数 y に渡します。あとは x と y を連結して、その結果を yield に渡せばいいわけです。

flatten_cps1() も簡単です。a[0] が [ ] の場合は [ ] をそのまま返す処理を追加するだけです。この場合、継続は実行されないので、配列の連結処理は行われず、[ ] をそのまま返すことができます。

それでは実行してみましょう。

irb> flatten_cps([1, [2, [], [3, 4, 5], 6, 7], 8, 9]) do |x| x end
=> [1, 2, 3, 4, 5, 6, 7, 8, 9]
irb> flatten_cps1([1, [2, [], [3, 4, 5], 6, 7], 8, 9]) do |x| x end
=> []

正常に動作していますね。

●高階関数と CPS

次は高階関数を CPS でプログラムすることを考えてみましょう。配列の平坦化とマッピングを行う関数 flatmap() を考えます。flatmap() は CPS でなければ次のようにプログラムできます。

リスト : 配列の平坦化とマッピング

def flatmap(fn, a)
  if a == []
    []
  elsif not a.instance_of?(Array)
    [fn.call(a)]
  else
    flatmap(fn, a[0]) + flatmap(fn, a[1..-1])
  end
end

簡単な実行例を示します。

irb> flatmap(proc {|x| x * x}, [[1, 2], [3, 4], [5, 6], [7, 8]])
=> [1, 4, 9, 16, 25, 36, 49, 64]

これを CPS に変換します。引数 fn に渡される関数も CPS で書かれていて、継続はブロックで渡されるものとします。プログラムは次のようになります。

リスト : flatmap (CPS)

def flatmap_cps(fn, a)
  if a == []
    yield []
  elsif not a.instance_of?(Array)
    fn.call(a) do |x|
      yield [x]
    end
  else
    flatmap_cps(fn, a[0]) do |x|
      flatmap_cps(fn, a[1..-1]) do |y|
        yield(x + y)
      end
    end
  end
end

ポイントは関数 fn に渡す継続 (ブロック) です。関数 fn の返り値は継続の引数 x に渡されます。あとは [x] を継続 yield に渡すだけです。

簡単な実行例を示します。

irb> flatmap_cps(proc {|x, &fn| fn.call(x * x)}, [[1, 2],[3, 4],[5, 6],[7, 8]]) do |x| x end
=> [1, 4, 9, 16, 25, 36, 49, 64]

CPS に変換すると、高階関数の処理を途中でやめることもできます。簡単な例として、要素が数値の配列を受け取り、要素を 2 乗した平坦化した配列を返す関数 foo を作ってみましょう。このとき、要素が負の場合は処理を中断して空の配列を返すものとします。

このよな処理は flatmap_cps() で簡単にプログラムすることができます。

リスト : 高階関数からの脱出

def foo(a)
  flatmap_cps(
    proc {|x, &fn|
      if x < 0
        []
      else
        fn.call(x * x)
      end
    },
    a
  ) do |x| x end
end

CPS の場合、処理を続ける場合は継続 (fn) を呼び出し、処理を中断するときは値を返すだけで実現できます。

簡単な実行例を示しましょう。

irb> foo([1, 2, [3, 4, [5, 6], 7, 8], 9, 10])
=> [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
irb> foo([1, 2, [3, 4, [5, -6], 7, 8], 9, 10])
=> []

正常に動作していますね。

●CPS による木の巡回

次は配列を「木」とみなして、木を巡回するプログラムを作ってみましょう。ここでは、配列を節 (node) とし要素を葉 (leaf) と考えます。木を巡回するプログラムは簡単です。次のリストを見てください。

リスト : 木の巡回

def for_each_tree(fn, a)
  if a == []
    []
  elsif a.instance_of?(Array)
    for_each_tree(fn, a[0])
    for_each_tree(fn, a[1..-1])
  else
    fn.call(a)
  end
end

関数 for_each_tree() は木 a を巡回して、各要素に関数 fn を適用します。for_each_tree() は関数 fn の副作用が目的なので、返り値に意味はありません。a が [ ] ならば何もせずに [ ] を返します。a が配列でなければ葉なので、a に関数 fn を適用します。あとは、a を先頭要素と残りの配列に分解して、for_each_tree() を再帰呼び出しするだけです。

このプログラムを CPS に変換すると、次のようになります。

リスト : 木の巡回 (CPS)

def for_each_tree_cps(fn, a)
  if a == []
    yield
  elsif not a.instance_of?(Array)
    fn.call(a)
    yield
  else
    for_each_tree_cps(fn, a[0]) do |x|
      for_each_tree_cps(fn, a[1..-1]) do |y|
        yield
      end
    end
  end
end

for_each_tree_cps() は副作用が目的なので、継続に値を渡す必要はありません。a が空リストの場合は yield を呼び出します。a が葉の場合は fn を適用してから yield を呼び出します。次に、for_each_tree_cps() を再帰呼び出しして先頭要素の部分木をたどり、その継続の中で残りの部分木をたどります。そして、その継続の中で yield を呼び出します。これで生成された継続を呼び出して、木を巡回することができます。

それでは実際に試してみましょう。

irb> for_each_tree_cps(proc {|x| print x, "\n"}, [1, [2, [3, 4], 5], 6]) do nil end
1
2
3
4
5
6

このように、木を巡回して各要素に関数 fn を適用することができます。

●CPS による継続の保存と実行の再開

木の巡回を CPS に変換すると、遅延評価を使って「ジェネレータ (generator)」と同様な動作を行わせることができます。拙作のページ Ruby 入門 第 9 回 のコラム「遅延評価」で説明したように、Ruby は手続きオブジェクトを使って遅延評価を行うことができます。

次のリストを見てください。

リスト : 木の巡回 (ジェネレータ)

def for_each_tree_gen(a)
  if a == []
    yield
  elsif not a.instance_of?(Array)
    return a, proc { yield }
  else
    for_each_tree_gen(a[0]) do |x|
      for_each_tree_gen(a[1..-1]) do |y|
        yield
      end
    end
  end
end

for_each_tree_gen() は木を巡回してその要素を順番に出力します。要素を返すとき、継続を手続きオブジェクトに格納していっしょに返すところがポイントです。このように、手続きオブジェクトを使って CPS の継続を保存することで、プログラムの実行を一時的に中断することができます。そして、その手続きオブジェクトを call することでプログラムの実行を再開し、次の要素を求めることができます。

なお、for_each_tree_gen() を呼び出すときに渡す継続が一番最後に呼び出されるので、終端を表す値 (たとえば nil や false など) を返すようにプログラムしてください。

簡単な実行例を示しましょう。

irb> a = for_each_tree_gen([1,[2,[3,4,5],6,7],8,9]) do nil end
=> [1, #<Proc: ... >]
irb> while a
irb>   print a[0], "\n"
irb>   a = a[1].call
irb> end
1
2
3
4
5
6
7
8
9

正常に動作していますね。ただし、このままではちょっと使いにくいので、次に示すようなクラスを作ってみました。

リスト : 簡単な遅延ストリーム

class Stream
  def initialize(stream)
    @stream = stream
  end

  def next
    if @stream
      v = @stream[0]
      @stream = @stream[1].call
    else
      v = nil
    end
    v
  end

  def empty?
    not @stream
  end
end

クラス名は Stream としました。メソッド next() で要素を順番に取り出し、メソッド empty?() で要素がなくなったかチェックします。それでは実行してみましょう。

irb> a = Stream.new(for_each_tree_gen([1,[2,[3,4,5],6,7],8,9]) do nil end)
=> #<Stream: ... >
irb> while not a.empty?
irb>   print a.next, "\n"
irb> end
1
2
3
4
5
6
7
8
9

一般に、データの流れを抽象化したデータ構造を「ストリーム (stream)」と呼びます。たとえば、ファイル入出力はストリームと考えることができます。また、配列を使ってストリームを表すこともできます。ただし、単純な配列では有限個のデータの流れしか表すことができません。ところが、遅延評価を用いると擬似的に無限個のデータを表すことができます。これを「遅延ストリーム」と呼びます。

遅延ストリームの基本的な考え方は、必要になったときに新しいデータを生成することです。このときに遅延評価を用います。具体的にはデータを生成する関数を用意し、それを遅延評価してストリームに格納しておきます。そして、必要になった時点で遅延評価しておいた関数を呼び出して、新しいデータを求めればよいわけです。今回作成したプログラムは遅延ストリームと同じ動作です。遅延ストリームは別の回で取り上げてみたいと思っています。


●Appendix : 末尾再帰と繰り返し

ここで「末尾再帰」について考えてみましょう。末尾再帰の「末尾」とは、関数の最後で行われる処理のことです。とくに末尾で関数を呼び出すことを「末尾呼び出し (tail call)」といいます。関数を呼び出す場合、返ってきたあとに行う処理のため、必要な情報を保存しておかなければいけません。ところが、末尾呼び出しはそのあとに実行する処理がありません。呼び出したあと元に戻ってくる必要さえないのです。

このため、末尾呼び出しはわざわざ関数を呼び出す必要はなく、アセンブリ言語のような低水準のレベルではジャンプ命令に変換することができます。これを「末尾呼び出し最適化 (tail call optimization)」とか「末尾最適化」といいます。とくに末尾再帰は末尾で自分自身を呼び出しているので、関数の中で繰り返しに変換することができます。

また、相互再帰やもっと複雑な再帰呼び出しの場合でも、末尾最適化を適用することで、繰り返しに変換できる場合もあります。このように、再帰プログラムを繰り返しに変換してから実行することを「末尾再帰最適化 (tail recursion optimization)」といいます。厳密にいうと末尾最適化なのですが、一般的には末尾再帰最適化と呼ばれることが多いようです。

末尾再帰最適化を行うプログラミング言語、たとえば Scheme の場合、次に示すような関数呼び出しは、スタックを消費せずに実行することができます。処理系は Gauche を使いました。

gosh> (define (foo) (foo))
foo
gosh> (foo)
=> 無限ループになる

これに対し、Ruby はデフォルトではスタックオーバーフローになります。

irb> def foo
irb>   foo
irb> end
=> :foo
irb> foo
(irb):?:in `foo': stack level too deep (SystemStackError)

もうひとつ簡単な例を示しましょう。C言語で階乗を計算する関数 fact() を作ります。

リスト : 末尾再帰を繰り返しに変換する (C言語)

/* 末尾再帰 */
int fact(int n, int a)
{
  if(n == 0){
    return a;
  } else {
    return fact(n - 1, a * n);
  }
}

/* 繰り返し */
int facti(int n, int a)
{
loop:
  if(n == 0) return a;
  a *= n;
  n--;
  goto loop;
}

fact() は末尾再帰になっています。これを繰り返しに変換すると facti() のようになります。引数 n と a の値を保存する必要がないので、n と a の値を書き換えてから goto 文で先頭の処理へジャンプするだけです。なお、最近はC言語でも末尾再帰最適化を行う処理系 (GCC など) があるようです。

●末尾再帰をスタックオーバーフローせずに実行する

Ruby の場合、末尾再帰よりも繰り返しでプログラムするのが普通だと思いますが、CPS は末尾再帰でプログラムしたほうが簡単です。この場合、手続きオブジェクトを用いることで、末尾再帰を繰り返しのように実行する方法があります。末尾再帰の場合、再帰呼び出しのあとに行う処理は存在せず、関数の返り値をそのまま返すだけです。この返り値のかわりに、関数呼び出しの部分を手続きオブジェクトに格納して返すこともできます。

ようするに、手続きオブジェクトを使って遅延評価するわけです。ここで実行中の処理を中断することができます。そして、その手続きオブジェクトを call すると、中断された処理を再開することができます。この場合、実行を再開するために必要な情報は手続きオブジェクトに保存されます。この方法は kiyoka さんのブログ kiyoka.2010_06_30 (リンク切れ) を参考にさせていただきました。kiyoka さんに感謝いたします。

簡単な例を示しましょう。階乗を末尾再帰でプログラムすると次のようになります。

リスト : 階乗 (末尾再帰)

def facti(n, a = 1)
  return a if n == 0
  facti(n - 1, a * n)
end

facti() は末尾で再帰呼び出しされているので末尾再帰になっています。facti() の呼び出しを手続きオブジェクトに格納すると、次のようになります。

リスト : 階乗 (末尾再帰)

def facti(n, a = 1)
  return a if n == 0
  proc { facti(n - 1, a * n) }
end

これで n が 0 のときは a の値を返し、それ以外の場合は手続きオブジェクトを返すようになります。手続きオブジェクトの中には実行に必要な情報 n - 1 と a * n が保存されることに注意してください。たとえば、facti(4, 1) を実行すると n = 3, a = 4 を格納した手続きオブジェクトが返されます。

実際に値を求めるには、次の関数を使います。

リスト : 手続きオブジェクトを繰り返し実行する

def trcall(value)
  while value.instance_of?(Proc)
    value = value.call
  end
  value
end
irb> 12.times {|n| print trcall(facti(n)), "\n"}
1
1
2
6
24
120
720
5040
40320
362880
3628800
39916800

引数 value が手続きオブジェクトであれば、それを call で実行します。そうでなければ while ループを抜けて value を返します。たとえば、trcall(facti(4, 1)) を実行すると、次のようになります。

trcall(facti(4, 1)) => trcall(#<Proc: facti(3, 4) #>)

call => #<Proc: facti(2, 12) #>
call => #<Proc: facti(1, 24) #>
call => #<Proc: facti(0, 24) #>
call => 24

    図 : trcall(facti(4, 1)) の実行

このように、trcall の中で手続きオブジェクトを繰り返し呼び出すことで階乗の値を求めることができます。

●CPS での実行

CPS も同じ方法で繰り返しのように実行することができます。この場合、手続きオブジェクトをそのまま使うよりも、次のようなクラスを定義したほうがわかりやすいプログラムになるでしょう。

リスト : 遅延評価

class Delay
  def initialize(&fn)
    @func = fn
  end
  
  def force
    @func.call
  end
  
  def Delay.trcall(value)
    while value.is_a?(Delay)
      value = value.force
    end
    value
  end
end

Delay.new() はブロックを受け取って @func にセットします。メソッド force() は @func に格納された手続きオブジェクトを実行します。クラスメソッド Delay.trcall() は value が Delay のオブジェクトであるあいだ value を force() で実行します。

fibo_cps() を Delay を使って書き直すと次のようになります。

リスト : フィボナッチ関数 (CPS, 遅延評価)

def fibo_cps(n)
  if n < 2
    yield 1
  else
    Delay.new do
      fibo_cps(n - 1) do |x|
        Delay.new do
          fibo_cps(n - 2) do |y|
            yield(x + y)
          end
        end
      end
    end
  end
end

fibo_cps() の呼び出しを Delay.new() で囲むだけです。あとは Delay.trcall() で値を求めるだけです。

簡単な実行例を示します。

リスト : fibo_cps のテスト

def test_fibo
  20.times do |x|
    a = Delay.trcall(fibo_cps(x) do |y| y end)
    print a, "\n"
  end
end
irb> test_fibo
0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181
=> 20

スタックオーバーフローせずに値を求めることができました。

ところで、元の関数を書き換えずに同様な処理を行うこともできます。athos さんの ブログ を参考にプログラムを作ってみました。athos さんに感謝いたします。

リスト : 末尾再帰をスタックオーバーフローせずに実行する

class Trcall
  def initialize(name)
    @first = true
    eval <<"DEF"
    def #{name}(*args)
      if @first
        @first = false
        value = super(*args)
        while value.instance_of?(Proc)
          value = value.call
        end
        @first = true
        value
      else
        proc { super(*args) }
      end
    end
DEF
  end
end

athos さんのプログラムはアラウンドエイリアスを使っていますが、同名のメソッドを eval() で定義しても同じことができます。クラス Trcall の initialize() は変換する関数をシンボルまたは文字列で受け取ります。そして、eval() で同名のメソッドを定義します。この場合、クラス定義の中で eval() が評価されるので、Trcall のメソッドになります。

トップレベルで定義された関数は Object のメソッドなので、super で簡単に呼び出すことができます。このとき、レシーバ (self) は Trcall のインスタンスなので、元のメソッドで再帰呼び出しが行われていたとしても、Trcall のメソッドが呼び出されることになります。

Trcall のメソッドは最初にインスタンス変数 @first をチェックし、最初の呼び出しであれば super(*args) で元のメソッドを呼び出します。2 回目以降の呼び出しは、super(*args) を手続きオブジェクトに格納して返します。あとは while ループで返り値をチェックして、手続きオブジェクトであればそれを call で呼び出すだけです。

Trcall は次のように使います。

リスト : Trcall の使い方

def fibo_cps1(n)
  if n < 2
    yield 1
  else
    fibo_cps1(n - 1) do |x|
      fibo_cps1(n - 2) do |y|
        yield(x + y)
      end
    end
  end
end

def test_fibo1
  f = Trcall.new(:fibo_cps)
  20.times do |x|
    a = f.fibo_cps1(x) do |y| y end
    print a, "\n"
  end
end
irb> test_fibo1
0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181
=> 20

これでスタックオーバーフローせずに値を求めることができます。

ところで、fibo_cps は末尾再帰になっていますが、関数の呼び出し回数は二重再帰の場合と同じです。したがって、末尾再帰最適化が行われたとしても実行速度は二重再帰の場合とほとんどかわりません。再帰呼び出しを CPS に変換したからといって、効率の良いプログラムになるとは限りません。ご注意くださいませ。


初版 2011 年 2 月 12 日
改訂 2023 年 1 月 21 日

Copyright (C) 2011-2023 Makoto Hiroi
All rights reserved.

[ PrevPage | R u b y | NextPage ]