M.Hiroi's Home Page

Functional Programming

お気楽 Scheme プログラミング入門

[ PrevPage | Scheme | NextPage ]

コルーチン

一般に、ジェネレータ (generator) は呼び出されるたびに新しい値を生成して返す関数のことをいいます。また、継続と継続渡しスタイル で作成したイテレータは、クロージャを呼び出すたびに中断していたプログラムの実行を再開し、次の要素を呼び出した側に返してプログラムの実行を中断します。この処理もジェネレータのひとつと考えることができます。この処理をさらに一般化して、複数のプログラム間で実行の中断や再開を相互に行わせることができます。このようなプログラムのことを「コルーチン (co-routine)」といいます。

サブルーチン (sub-routine) は call してから return するまで途中で処理を中断することはできませんが、コルーチンは途中で処理を中断し、そこから実行を再開することができます。また、コルーチンを使うと複数のプログラムを (擬似的に) 並行に動作させることができます。この動作は「スレッド (thread)」とよく似ています。

一般に、スレッドは一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。コルーチンの場合、プログラムの実行は一定時間ごとに切り替わるものではなく、プログラム自身が実行を中断しないといけません。これを「ノンプリエンプティブ (nonpreemptive)」といいます。

コルーチンで複数のプログラムを並行に動作させるには、あるプログラムだけを優先的に実行するのではなく、他のプログラムが実行できるよう自主的に処理を中断する、といった協調的な動作を行わせる必要があります。そのかわり、スレッドと違って排他制御といった面倒な処理を考える必要がなく、スレッドのような切り替え時のオーバーヘッドも少ないことから、スレッドよりも動作が軽くて扱いやすいといわれています。

最近では、コルーチンをサポートしているプログラミング言語が増えてきました。M.Hiroi が知っている言語では Lua, Ruby (Fiber : ファイバー), Julia などがあります。また、Eralng のプロセスや Go 言語 の goroutine は、プリエンプティブなマルチスレッドに近いものですが、軽量で高速に動作するといわれています。今回は Lua や Ruby を参考に、Scheme の継続を使ってコルーチンを作成してみましょう。

●コルーチンの動作

今回作成するコルーチンには親子関係を持たせることにします。コルーチン A からコルーチン B を呼び出した場合、A が親で B が子になります。このように主従関係を持つコルーチンを「セミコルーチン (semi-coroutine)」といいます。コルーチンの親子関係は木構造と考えることができます。子のコルーチンは親または祖先のコルーチンを呼び出すことはできません。

新しいコルーチンは coroutine-create で生成します。coroutine-create は引数なしの関数を引数として受け取ります。このような関数を thunk といいます。coroutine-create はコルーチンを表すデータ (リスト) を返します。このデータを coroutine と呼ぶことにしましょう。

コルーチンを実行 (または再開) するには関数 coroutine-resume を使います。coroutine-resume には coroutine-create が返した coroutine と引数をひとつ渡します。coroutine-resume を呼び出したほうが親、呼び出されたほうが子になります。子コルーチンの中で関数 coroutine-yield を評価すると、そこでプログラムの実行を中断して親コルーチンに戻ります。

このとき、coroutine-yield の引数が親コルーチンで呼び出した coroutine-reusme の返り値になります。また、coroutine-resume の 2 番目の引数は coroutine-yeild の返り値になります。ただし、最初に coroutine-resume を呼び出したとき、まだ coroutine-yeild は呼び出されていないので、渡した引数は無視されます。

●コルーチンの作成

それでは継続を使ってコルーチンを作りましょう。次のリストを見てください。

リスト : コルーチン

(define-library (mylib coroutine)
  (import (scheme base))
  (export coroutine-initialize coroutine-create coroutine-yield coroutine-resume)
  (begin
    ;; データ型の定義
    (define-record-type Coroutine
      (make-coroutine save cont)
      coroutine?
      (save get-save set-save!)
      (cont get-cont set-cont!))

    ;; コルーチンの継続を格納
    (define *coroutine* 'root)

    ;; コルーチンの初期化
    (define (coroutine-initialize)
      (set! *coroutine* 'root))

    ;; コルーチンの生成
    (define (coroutine-create proc)
      (make-coroutine #f (lambda (x) (*coroutine* #f (proc)))))

    ;; 実行を中断して値を返す
    (define (coroutine-yield x)
      (call/cc
       (lambda (cont)
         (*coroutine* cont x))))

    ;; 実行を再開する
    (define (coroutine-resume co x)
      (cond
       ((not (coroutine? co))
        (error "no coroutine"))
       ((get-save co)
        (error "double resume"))
       ((not (get-cont co))
        (error "dead coroutine called"))
       (else
        (call-with-values
            (lambda ()
              (call/cc
               (lambda (cont)
                 (set-save! co *coroutine*)
                 (set! *coroutine* cont)
                 ((get-cont co) x))))
          (lambda (cont val)
            (set! *coroutine* (get-save co))
            (set-save! co #f)
            (set-cont! co cont)
            val)))))

    ))

最初にコルーチンを表すレコード型 Coroutine を定義します。大域変数 *coroutine* は coroutine-yield で親コルーチンに戻るための継続をセットします。この変数の使い方は 非決定性 で作成した amb と同じです。

コルーチンを生成する coroutine-create は make-coroutine でレコード型 Coroutine を生成します。フィールド変数 save は *coroutine* を退避するために使います。フィールド変数 cont が coroutine-resume で実行を再開するための継続です。最初は引数 proc を実行するラムダ式をセットします。

継続 *coroutine* が返す値は 2 つあります。最初の値は coroutine-yield のあとに実行を再開するための継続で、次の値が coroutine-resume の返り値になります。ここで、ラムダ式の *coroutine* が一番最後に評価されることに注意してください。このあと、実行するコルーチンの処理はないので、第 1 引数を #f にセットします。第 2 引数は proc の返り値になります。

コルーチンの実行を中断する coroutine-yield も簡単です。call/cc で継続 cont を取り出して、*coroutine* に cont と引数 x を渡して呼び出すだけです。

coroutine-resume はちょっとだけ複雑です。最初にエラーチェックを行います。引数 co が Coroutine 型ではない場合、co はコルーチンではないのでエラーを送出します。第 2 要素が真の場合、そのコルーチンは実行中なので double resume というエラーを送出します。第 3 要素が偽の場合、再開する継続がないので dead coroutine called というエラーを送出します。

エラーがない場合、第 3 要素の継続 (またはラムダ式) を評価して、コルーチンの実行を再開します。coroutine-yield は多値を返すので、それを call-with-values で受け取ります。まず call/cc で coroutine-resume のあとに実行する継続を引数 cont に取り出します。

次に、*coroutine* の値を co のフィールド変数 save に退避し、取り出した継続 cont を *coroutine* にセットします。これで coroutine-yield を呼び出すと、call-with-values の第 2 引数の処理が実行されて、coroutine-resume のあとの処理を再開することができます。

第 2 引数のラムダ式では引数 cont に継続を、val に coroutine-resume に返す値を受け取ります。まず最初に、退避していた *corotine* の値を元に戻して、save の値を #f にします。それから引数 cont を co のフィールド変数 cont にセットします。次に coroutine-resume を呼び出したとき、この継続を評価することでコルーチンの実行を再開することができます。最後に val を返します。

それでは実際に試してみましょう。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (define c (coroutine-create (lambda ()
(let loop ((n 0)) (display (coroutine-yield n)) (newline) (loop (+ n 1))))))
c
gosh[r7rs.user]> (coroutine-resume c #f)
0
gosh[r7rs.user]> (coroutine-resume c 'a)
a
1
gosh[r7rs.user]> (coroutine-resume c 'b)
b
2
gosh[r7rs.user]> (coroutine-resume c 'c)
c
3
gosh[r7rs.user]> (coroutine-resume c 'd)
d
4

named-let で無限ループを作り、その中で coroutine-yield を呼び出して n の値を返します。最初に coroutine-resume を呼び出すと、コルーチンが評価されて最初の値 0 が返されます。このとき、coroutine-resume に渡された引数 #f は捨てられます。次にcoroutine-resume を呼び出すと、引数 a が coroutine-yield の返り値となり、それが display で表示されます。そして、n の値が +1 されて coroutine-yield が呼び出されて 1 が返されます。coroutine-resume を呼び出すたびに、この処理が繰り返し行われます。

●簡単なテスト

それでは複数のコルーチンを使った簡単なテストを行ってみましょう。次のリストを見てください。

リスト : 簡単なテスト1

(define (make-coroutine code)
  (coroutine-create
    (lambda ()
      (let loop ()
        (display code)
        (coroutine-yield #f)
        (loop)))))

(define a0 (make-coroutine "h"))
(define a1 (make-coroutine "e"))
(define a2 (make-coroutine "y"))
(define a3 (make-coroutine "!"))
(define a4 (make-coroutine " "))

(define (test n)
  (let loop ((n n))
    (when 
     (positive? n)
     (for-each
       (lambda (x) (coroutine-resume x #f))
       (list a0 a1 a2 a3 a4))
     (loop (- n 1)))))

関数 make-coroutine は引数 code を表示するコルーチンを生成します。h, e, y, !, 空白を表示するコルーチンを生成し、関数 test で順番に呼び出すと、指定した回数だけ "hey! " を表示することができます。

実行例を示します。

gosh[r7rs.user]> (load "./test_co.scm")
#t
gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (test 5)
hey! hey! hey! hey! hey! #<undef>
gosh[r7rs.user]> (test 10)
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! #<undef>
gosh[r7rs.user]> (test 20)
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! 
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! #<undef>

コルーチンの中から他のコルーチンを呼び出すこともできます。次のリストを見てください。

リスト : 簡単なテスト2

(define (make-coroutine-b code next)
  (coroutine-create
    (lambda ()
      (let loop ()
        (display code)
        (if next
            (coroutine-resume next #f))
        (coroutine-yield #f)
        (loop)))))

(define b0 (make-coroutine-b " " #f))
(define b1 (make-coroutine-b "!" b0))
(define b2 (make-coroutine-b "y" b1))
(define b3 (make-coroutine-b "e" b2))
(define b4 (make-coroutine-b "h" b3))

(define (test-b n)
  (let loop ((n n))
    (when 
     (positive? n)
     (coroutine-resume b4 #f)
     (loop (- n 1)))))

関数 make-coroutine-b は code のほかに次に実行するコルーチン next を受け取ります。コルーチンの中では、code を表示したあと next が真であれば、coroutine-resume で next の実行を再開します。そのあと、coroutine-yield で親コルーチンに戻ります。あとはコルーチンを 5 つ生成して、関数 test-b で最後に生成したコルーチン b4 を呼び出します。実行結果はテスト1と同じになります。

●高階関数をジェネレータに変換

コルーチンを使うと高階関数をジェネレータに変換することも簡単にできます。たとえば、リストを「木」とみなして、木を巡回する高階関数 for-each-tree を考えてみましょう。for-each-tree は次のように簡単にプログラムできます。

リスト : 木の巡回

(define (for-each-tree fn tree)
  (cond 
   ((null? tree) #f)
   ((pair? tree)
    (for-each-tree fn (car tree))
    (for-each-tree fn (cdr tree)))
   (else 
    (fn tree))))

簡単な実行例を示します。

gosh[r7rs.user]> (for-each-tree (lambda (x) (display x) (newline)) '(a (b (c d e) f) g))
a
b
c
d
e
f
g
#f

このような高階関数をジェネレータに変換する場合もコルーチンを使うと簡単にできます。次のリストを見てください。

リスト : 高階関数からジェネレータを生成

(define (make-generator fn . args)
  (coroutine-create
    (lambda ()
      (apply fn (lambda (x) (coroutine-yield x)) args))))

引数 fn は高階関数、そのあとに fn に渡す引数を可変個引数で受け取ります。なお、関数 fn は第 1 引数に関数を受け取るものとします。coroutine-create に渡すラムダ式の中で関数 fn を apply で呼び出します。このとき、第 1 引数にラムダ式を渡して、その中で coroutine-yield を実行します。これで fn が評価されて、第 1 引数で渡したラムダ式が呼び出されると、coroutine-yield により引数 x を coroutine-resume に返して実行が中断されます。

それでは実行してみましょう。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (define c (make-generator for-each-tree '(a (b (c d e) f) g)))
c
gosh[r7rs.user]> (coroutine-resume c #f)
a
gosh[r7rs.user]> (coroutine-resume c #f)
b
gosh[r7rs.user]> (coroutine-resume c #f)
c
gosh[r7rs.user]> (coroutine-resume c #f)
d
gosh[r7rs.user]> (coroutine-resume c #f)
e
gosh[r7rs.user]> (coroutine-resume c #f)
f
gosh[r7rs.user]> (coroutine-resume c #f)
g
gosh[r7rs.user]> (coroutine-resume c #f)
#f
gosh[r7rs.user]> (coroutine-resume c #f)
*** ERROR: dead coroutine called

coroutine-resume を呼び出すたびに、木の要素を順番に取り出して返します。要素がなくなると for-each-tree の返り値 #f を返します。そのあと、coroutine-resume を呼び出すとエラーが送出されます。

●順列の生成

順列を生成するジェネレータは make-generator を使わなくても、コルーチンで直接プログラムすることができます。次のリストを見てください。

リスト : 順列の生成

(define (gen-perm ls m)
  (coroutine-create
    (lambda ()
      (if (zero? m)
          (coroutine-yield '())
          (let ((gen (gen-perm ls (- m 1))))
            (let loop ((x (coroutine-resume gen #f)))
              (cond (x
                     (for-each
                      (lambda (y)
                        (if (not (member y x))
                            (coroutine-yield (append x (list y)))))
                      ls)
                     (loop (coroutine-resume gen #f)))))))
      #f)))

関数 gen-perm は順列を生成するコルーチンを返します。引数 ls が選択する要素を格納したリスト、m が選ぶ個数です。m が 0 の場合、要素の選択が終わったので coroutine-yield で空リストを返します。そうでなければ、gen-perm を呼び出して新しいコルーチン gen を生成します。

次に named-let でその要素 (順列を格納したリスト) を取り出して x にセットし、それに含まれていない要素 y を選びます。あとは coroutine-yield で y を追加したリストを返します。これで順列を生成することができます。

簡単な実行例を示します。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (define c (gen-perm '(a b c) 3))
c
gosh[r7rs.user]> (coroutine-resume c #f)
(a b c)
gosh[r7rs.user]> (coroutine-resume c #f)
(a c b)
gosh[r7rs.user]> (coroutine-resume c #f)
(b a c)
gosh[r7rs.user]> (coroutine-resume c #f)
(b c a)
gosh[r7rs.user]> (coroutine-resume c #f)
(c a b)
gosh[r7rs.user]> (coroutine-resume c #f)
(c b a)
gosh[r7rs.user]> (coroutine-resume c #f)
#f

●エラトステネスの篩

最後にコルーチンを使って素数を求めるプログラムを作ってみましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するコルーチンを用意します。この場合、コルーチンを「遅延ストリーム」として使います。2 は素数なので、この整数列から 2 で割り切れる整数を取り除き除きます。ここでもコルーチンを使って、入力ストリームから 2 で割り切れる整数を取り除いたストリームを返すフィルターを作ります。

2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これもフィルターを使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くようにフィルターを設定すればいいわけです。

このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番にフィルターで設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。

リスト : エラトステネスの篩

;;; n から始まる整数列
(define (integers n)
  (coroutine-create
   (lambda ()
     (let loop ((n n))
       (coroutine-yield n)
       (loop (+ n 1))))))

;;; フィルター
(define (stream-filter pred co)
  (coroutine-create
   (lambda ()
     (let loop ()
       (let ((x (coroutine-resume co #f)))
         (when (pred x)
           (coroutine-yield x))
         (loop))))))

;;; n 個の素数を求める
(define (sieve n)
  (let ((nums (integers 2)))
    (let loop ((n n))
      (when
       (positive? n)
       (let ((x (coroutine-resume nums #f)))
         (display x) (display " ")
         (set! nums
               (stream-filter (lambda (y) (not (zero? (modulo y x)))) nums))
         (loop (- n 1)))))))

関数 integers は n から始まる整数列を生成するストリームです。このような遅延ストリームはコルーチンを使って簡単に作ることができます。関数 filter は述語 pred が偽を返す要素をコルーチン co から取り除きます。co から要素を取り出して x にセットします。pred(x) が真であれば coroutine-yield で親コルーチンに x を返します。これで述語が偽を返す要素を取り除くことができます。

素数を求める関数 sieve も簡単です。引数 n は求める素数の個数です。最初に、2 から始まる整数列を integers で生成して変数 nums に セットします。このストリーム nums の先頭要素が素数になります。cotoutine-resume でストリームから素数を取り出して x にセットします。次に x を表示して、x で割り切れる整数を取り除くフィルターを生成して nums にセットします。つまり、n 個の素数を求めるために、n 個のフィルターをストリームに重ねていくわけです。

それでは実際に (sieve 100) を実行してみましょう。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (sieve 100)
2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 
101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 
193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 
293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 
409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499 503 509 
521 523 541 #<undef>

正常に動作していますね。


初版 2011 年 4 月 17 日
改訂 2020 年 10 月 11 日

部分継続

「部分継続」は Gauche ver 0.9.1 で導入された新しい機能です。継続はある時点からプログラムを終了するまでの処理を表したものですが、部分継続は処理の範囲を限定する、つまり、ある時点 A からある時点 B までの処理を表したものになります。部分継続は「限定継続」と呼ばれることがあります。

部分継続を使う場合はライブラリ (gauche partcont) をインポートしてください。まずは最初に簡単な例を示しましょう。

gosh[r7rs.user]> (import (gauche partcont))
gosh[r7rs.user]> (+ 1 (reset (* 2 (shift cont 3) 4)) 5)
9
gosh[r7rs.user]> (+ 1 (reset (* 2 (shift cont (cont 3)) 4)) 5)
30
gosh[r7rs.user]> (define k #f)
k
gosh[r7rs.user]> (+ 1 (reset (* 2 (shift cont (set! k cont) 3) 4)) 5)
9
gosh[r7rs.user]> (k 3)
24

部分継続はマクロ reset と shift を使います。(reset ...) で部分継続の範囲を示し、shift の第 1 引数に取り出された部分継続が渡されます。shift は残りの引数を順番に評価し、最後に評価した引数の値が reset の返り値になります。shift の返り値ではなく、reset の返り値になることに注意してください。

最初の例を見てください。部分継続は reset の範囲なので (* 2 [ ] 4) になります。継続と同様に、[ ] の中には部分継続を評価するときに渡される引数が入ります。shift の引数 cont に部分継続が渡され、残りの引数 3 が reset の返り値になります。部分継続として取り出された処理は、そのままでは評価されないことに注意してください。したがって、(+ 1 3 5) を計算して結果は 9 になります。

2 番目の例では、shift の最後で部分継続 cont を評価します。部分継続は (* 2 [ ] 4) なので、cont の引数 3 を [ ] に当てはめ、(* 2 3 4) を計算して結果は 24 になります。これが reset の返り値になるので値は 30 になります。3 番目の例は shift の中で部分継続を大域変数 k にセットします。そして、(k 3) を評価すると 24 になり、部分継続が保存されていることがわかります。

●イテレータを生成する関数

それでは簡単な例題として、継続と継続渡しスタイル で作成したイテレータを生成する関数 make-iter を部分継続を使って書き直してみましょう。make-iter のリストを再掲します。

リスト : イテレータを生成する関数 (再掲)

(define (make-iter proc . args)
  (letrec ((iter
            (lambda (return)
              (apply 
                proc
                (lambda (x)             ; 高階関数に渡す関数の本体
                  (set! return          ; 脱出先継続の書き換え
                   (call/cc
                    (lambda (cont)
                      (set! iter cont)  ; 継続の書き換え
                      (return x)))))
                args)
              ;; 終了後は継続 return で脱出
              (return #f))))
    (lambda ()
      (call/cc
        (lambda (cont) (iter cont))))))

部分継続を使って make-iter を書き直すと次のようになります。

リスト : イテレータを生成する関数 (部分継続)

(define (make-iter1 proc . args)
  (letrec ((iter (lambda ()
                   (apply
                     proc
                     (lambda (x) (shift k (set! iter k) x))
                     args)
                   #f)))
    (lambda () (reset (iter)))))

局所関数 iter は (reset (iter)) で呼び出します。reset が脱出先の継続を表します。iter は proc に渡すラムダ式で shift を使って継続 k を取り出し、iter の値を k に書き換えます。そして、shift の最後で引数 x を評価すれば、その値が reset の返り値になります。これで要素を順番に取り出して返すことができます。

簡単な実行例を示します。

gosh[r7rs.user]> (define a (make-iter1 for-each-tree '(a (b (c d e) f) g)))
a
gosh[r7rs.user]> (a)
a
gosh[r7rs.user]> (a)
b
gosh[r7rs.user]> (a)
c
gosh[r7rs.user]> (a)
d
gosh[r7rs.user]> (a)
e
gosh[r7rs.user]> (a)
f
gosh[r7rs.user]> (a)
g
gosh[r7rs.user]> (a)
#f

●部分継続によるコルーチンの実装

部分継続を使うとコルーチンも簡単に実装することができます。次のリストを見てください。

リスト : コルーチン (2)

(define-library (mylib coroutine1)
  (import (scheme base) (gauche partcont))
  (export coroutine-create coroutine-yield coroutine-resume)
  (begin
    ;; データ型の定義
    (define-record-type Coroutine
      (make-coroutine save cont)
      coroutine?
      (save get-save set-save!)
      (cont get-cont set-cont!))

    ;; コルーチンの生成
    (define (coroutine-create proc)
      (make-coroutine #f (lambda (x) (shift cont (values #f (proc))))))

    ;; 実行を中断して値を返す
    (define (coroutine-yield x)
      (shift cont (values cont x)))

    ;; 実行を再開する
    (define (coroutine-resume co x)
      (cond
       ((not (coroutine? co))
        (error "no coroutine"))
       ((get-save co)
        (error "double resume"))
       ((not (get-cont co))
        (error "dead coroutine called"))
       (else
        (set-save! co #t)
        (let-values (((cont val) (reset ((get-cont co) x))))
          (set-save! co #f)
          (set-cont! co cont)
          val))))
    ))

coroutine-resume でコルーチンを再開するとき、reset でコルーチンを表す部分継続を評価します。coroutine-yield では shift を使って値を返します。これで部分継続を呼び出した reset に戻ることができます。shift では部分継続と値を返すために values を使っていることに注意してください。このため、reset の返り値は let-values で受け取ります。

簡単な実行例を示します。

gosh> (sieve 100)
2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103
107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211
223 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331
337 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449
457 461 463 467 479 487 491 499 503 509 521 523 541 #<undef>

次は、コルーチンの実行速度を比較してみましょう。(time (sieve 500)) で時間を計測した結果を示します。

継続     : 0.41 秒
部分継続 : 1.09 秒
実行環境 : Gauche ver 0.9.9, Ubunts 18.04 (WSL), Intel Core i5-6200U 2.30GHz

Gauche の場合、実行時間は部分継続のほうが少し遅くなるようです。興味のある方はいろいろ試してみてください。

●継続による部分継続の実装

部分継続は継続を使って実装することができます。マクロ shift / reset の実装例は shift-reset.scm にあります。このプログラムを参考に、わかりやすく関数で書き直すと次のようになります。

リスト : 部分継続の実装

(define *partcont* #f)

;;; fn は thunk
(define (reset fn)
  (let ((save *partcont*))
    (call/cc
      (lambda (cont)
        (set! *partcont*
              (lambda (val)
                (set! *partcont* save)
                (cont val)))
        (*partcont* (fn))))))

;;; fn は 1 引数
(define (shift fn)
  (call/cc
    (lambda (cont)
      (*partcont*
        (fn (lambda (val)
              (reset (lambda () (cont val)))))))))

大域変数 *partcont* に脱出先の継続をセットします。関数 reset は簡単です。最初に、*partcont* の値を局所変数 save に退避します。次に call/cc で継続 cont を取り出して、*partcont* の値を書き換えます。ラムダ式の中では、*partcont* の値を元に戻し、引数 val を継続 cont に渡して呼び出します。これで val が reset の返り値になります。最後に、引数 fn の評価結果を *partcont* に渡して呼び出します。

関数 shift は簡単なように見えますが、その動作はかなり複雑です。引数 fn に渡されるラムダ式が部分継続を表します。この処理はあとで詳しく説明します。部分継続を呼び出さない場合の動作は簡単で、fn の評価結果を *partcont* に渡して呼び出すだけです。この場合、*partcont* は reset によりラムダ式がセットされているので、それを評価すると渡した引数が reset の返り値になります。つまり、fn の返り値が reset の返り値になるわけです。

問題は部分継続の処理です。次の動作を例に考えてみましょう。

gosh[r7rs.user]> (+ 1 (reset (lambda () (* 2 (shift (lambda (k) (k 3))) 4))) 5)
30
(reset -------- (1)
  (lambda ()
    (* 2
       (call/cc --------- (2)
         (lambda (cont)
           (*partcont*
             (fn (lambda (val)
                    (reset ------- (3)
                      (lambda () (cont val))))))))
       4)))


        図 : 部分継続の動作

上図を見てください。最初の reset で *partcont* の脱出先は reset (1) に設定されます。次に、shift の中で部分継続を評価します。すると、その中で reset が実行されるので、*partcont* の脱出先は reset (3) に設定されます。次に、部分継続の中で (cont val) が評価され、call/cc (2) の返り値が val になります。動作例では部分継続 (* 2 [ ] 4) に 3 が渡されるので (* 2 3 4) が計算されて 24 になります。この値が reset (1) で実行された fn の返り値になります。

reset の中ではこの値を *partcont* に渡して評価します。すると、reset (3) に戻り、*partcont* の脱出先を reset (1) に戻して、24 が reset (3) の返り値になります。fn では部分継続の評価を一番最後に行っているので、24 が fn の返り値になります。動作例では (lambda (k) (k 3)) の返り値が 24 になります。この値が *partcont* に渡されて評価されるので、reset (1) に戻って 24 を返すことができます。

これで部分継続として動作するのですから、M.Hiroi はとても驚きました。非常に巧妙なプログラムなので、その動作を理解するのは簡単ではないと思います。じっくりと考えてみてください。

簡単な実行例を示します。

gosh[r7rs.user]> (+ 1 (reset (lambda () (* 2 (shift (lambda (k) 3)) 4))) 5)
9
gosh[r7rs.user]> (+ 1 (reset (lambda () (* 2 (shift (lambda (k) (k 3))) 4))) 5)
30
gosh[r7rs.user]> (define c #f)
c
gosh[r7rs.user]> (+ 1 (reset (lambda () (* 2 (shift (lambda (k) (set! c k) 3)) 4))) 5)
9
gosh[r7rs.user]> (c 3)
24
gosh[r7rs.user]> (define d #f)
d
gosh[r7rs.user]> (+ 1 (reset (lambda () (* 20 (shift (lambda (k) (set! d k) 3)) 40))) 5)
9
gosh[r7rs.user]> (d 1)
800
gosh[r7rs.user]> (c 3)
24

部分継続を保存して、あとから実行することもできます。

ところで、このままでは values で多値を返すことができないので、プログラムを次のように修正します。

リスト : 部分継続の実装 (2)

(define *partcont* #f)

;;; fn は thunk
(define (reset fn)
  (let ((save *partcont*))
    (call/cc
      (lambda (cont)
        (set! *partcont*
              (lambda val
                (set! *partcont* save)
                (apply cont val)))
        (let-values ((args (fn)))
          (apply *partcont* args))))))

;;; fn は可変個引数
(define (shift fn)
  (call/cc
    (lambda (cont)
      (let-values ((args (fn (lambda val (reset (lambda () (apply cont val)))))))
        (apply *partcont* args)))))

reset の中で、*partcont* にセットするラムダ式の引数 val を可変個引数で受け取るように変更します。val はリストなので、継続 cont に渡して呼び出すときは apply を使います。次に、fn の返り値を receive で受け取るように変更します。このとき、receive の第 1 引数にシンボルを指定すると、多値をリストに格納して受け取ることができます。この値を *partcont* に渡して評価します。

shift も同様の変更を行います。部分継続を表すラムダ式を可変個引数とし、それを継続 cont に渡して評価します。fn の返り値は receive で受け取り、それを *partcont* に渡して評価します。

それでは簡単な実行例を示します。

gosh[r7rs.user]> (reset (lambda () (let-values (((a b) (shift (lambda (k) (values 1 2))))) (list a b))))
1
2
gosh[r7rs.user]> (reset (lambda () (let-values (((a b) (shift (lambda (k) (k 1 2))))) (list a b))))
(1 2)

最初の例は values で返した多値が reset の返り値になるので 1 と 2 が表示されます。次の例は (k 1 2) で多値を返し、それを receive で受け取ります。変数 a が 1 に、b が 2 になるので、receive の返り値は (1 2) になり、それが reset の返り値になります。

この部分継続を使ってコルーチンを動作させることもできます。次のリストを見てください。

リスト : コルーチン (3)

(define-library (mylib coroutine2)
  (import (scheme base))
  (export reset shift coroutine-create coroutine-yield coroutine-resume)
  (begin
    ;;
    ;; 限定継続
    ;;
    (define *partcont* #f)

    ;; fn は thunk
    (define (reset fn)
      (let ((save *partcont*))
        (call/cc
         (lambda (cont)
           (set! *partcont*
                 (lambda val
                   (set! *partcont* save)
                   (apply cont val)))
           (let-values ((args (fn)))
             (apply *partcont* args))))))

    ;; fn は可変個引数
    (define (shift fn)
      (call/cc
       (lambda (cont)
         (let-values ((args (fn (lambda val (reset (lambda () (apply cont val)))))))
           (apply *partcont* args)))))

    ;;
    ;; コルーチン
    ;;

    ;; データ型の定義
    (define-record-type Coroutine
      (make-coroutine save cont)
      coroutine?
      (save get-save set-save!)
      (cont get-cont set-cont!))

    ;; コルーチンの生成
    (define (coroutine-create proc)
      (make-coroutine #f (lambda (x) (shift (lambda (k) (values #f (proc)))))))

    ;; 実行を中断して値を返す
    (define (coroutine-yield x)
      (shift (lambda (k) (values k x))))

    ;; 実行を再開する
    (define (coroutine-resume co x)
      (cond
       ((not (coroutine? co))
        (error "no coroutine"))
       ((get-save co)
        (error "double resume"))
       ((not (get-cont co))
        (error "dead coroutine called"))
       (else
        (set-save! co #t)
        (let-values (((cont val) (reset (lambda () ((get-cont co) x)))))
          (set-save! co #f)
          (set-cont! co cont)
          val))))
    ))

プログラムは関数 shift, reset の仕様に合わせて修正しただけです。実際に (sieve 500) の実行時間を計測したところ、結果は次のようになりました。

(sieve 500) : 0.66 秒

Gauche の部分継続を使ったバージョンよりも少し速くなりました。もっと時間がかかると思っていたので、この結果にはちょっと驚きました。

●部分継続によるバックトラックの実装

部分継続を使うとバックトラックも簡単に実装することができます。次のリストを見てください。

リスト : 非決定性の選択

(define (choice a b) (shift k (k a) (k b)))

関数 choice は部分継続 k を取り出し、それに引数 a, b を渡して呼び出しています。choice は reset と組み合わせることで、引数 a, b を順番に返す関数として使用することができます。簡単な例を示しましょう。

gosh[r7rs.user]> (import (gauche partcont))
gosh[r7rs.user]> (define (choice a b) (shift k (k a) (k b)))
choice
gosh[r7rs.user]> (reset (let ((a (choice 1 2))) (display a) (newline)))
1
2
#<undef>
gosh[r7rs.user]> (reset (let ((a (choice 1 2)) (b (choice 3 4))) (display (list a b)) (newline)))
(1 3)
(1 4)
(2 3)
(2 4)
#<undef>

部分継続 k の範囲は shift から reset までなので、k を評価することは次の式を呼び出すことと同じ動作になります。

(let ((a [X])) (display a) (newline))

(choice 1 2) の場合、(k 1) を評価すると上式の [X] が 1 になり、局所変数 a が 1 に束縛され、display で 1 が表示されます。部分継続は関数呼び出しと同じなので、reset まで処理が終了すると呼び出し元に戻ってきます。次は choice の shift で (k 2) が評価されるので、変数 a が 2 に束縛されて 2 が表示されます。shift の中で (k 2) が最後に評価される S 式なので、その返り値 #<undef> (newline の返り値) が reset の返り値になります。

次の例は choice を 2 つ使っています。変数 a が 1 に束縛された後、(choice 3 4) を評価する場合、その部分継続に相当する式は次のようになります。

(let ((a 1) (b [X])) (display (list a b) (newline))

(k 3) が評価されると局所変数 b が 3 に束縛されて (1 3) が表示され、(k 4) を評価すると (1 4) が表示されます。(choice 3 4) の評価が終了すると、(choice 1 2) の部分継続 (k 1) の評価が終了し、次は (k 2) が呼び出されます。すると、変数 a が 2 に束縛され、(choice 3 4) が再び評価されるので、(2 3), (2 4) が表示されます。最後に、(choice 1 2) で評価された (k 2) の値が reset の返り値になります。

choice を拡張して複数の引数を順番に返していくことも簡単に実現できます。次のリストを見てください。

リスト : 非決定性の選択 (2)

(define (choice . args) (shift k (for-each (lambda (x) (k x)) args)))

for-each で引数 args の要素を順番に取り出して部分継続 k に渡していくだけです。簡単な実行例を示しましょう。

gosh[r7rs.user]> (define (choice . args) (shift k (for-each (lambda (x) (k x)) args)))
choice
gosh[r7rs.user]> (reset (let ((a (choice 1 2 3)) (b (choice 'a 'b 'c))) (display (list a b)) (newline)))
(1 a)
(1 b)
(1 c)
(2 a)
(2 b)
(2 c)
(3 a)
(3 b)
(3 c)
#<undef>

条件を満たしていない場合にバックトラックする関数 assert も簡単に定義できます。

リスト : 条件 pred を満たさない場合はバックトラックする

(define (assert pred) (shift k (if pred (k #f))))

shift で部分継続 k を取り出します。この場合、部分継続 k は assert 以降の処理になります。pred が真の場合、k を評価して assert 以降の処理を実行します。偽の場合、if の値 (Gauche であれば #<undef>) が reset の返り値になります。

部分継続は shift から reset までの処理を関数にしたもので、reset の返り値は呼び出した部分継続の返り値になります。部分継続の評価中に shift で reset へ脱出することは、その関数呼び出しを終了して呼び出し元に戻ることに相当します。つまり、バックトラックして choice で次の要素を選ぶことができるわけです。

簡単な例を示しましょう。

gosh[r7rs.user]> (define (assert pred) (shift k (if pred (k #f))))
assert
gosh[r7rs.user]> (reset (let ((a (choice 1 2 3 4 5 6))) (assert (even? a)) (display a) (newline)))
2
4
6
#<undef>

assert で条件 (even? a) を指定することで、偶数の要素だけを display で表示することができます。

順列の生成も簡単にできます。

リスト : 順列の生成

(define (perm n xs)
  (let loop ((n n) (a '()))
    (cond
     ((zero? n)
      (display (reverse a))
      (newline))
     (else
      (reset
        (let ((x (apply choice xs)))
          (assert (not (member x a)))
          (loop (- n 1) (cons x a))))))))

choice でリストの要素を 1 つ選び、それが順列 a に含まれていないことを assert で確認します。同じ要素が含まれていれば、バックトラックして異なる要素を選びます。n 個の要素を選んだら reverse でリスト a を反転した値を返します。

簡単な実行例を示します。

gosh[r7rs.user]> (perm 3 '(a b c))
(a b c)
(a c b)
(b a c)
(b c a)
(c a b)
(c b a)
#<undef>

最後に拙作のページ 非決定性 で取り上げた「地図の配色問題」を解いてみましょう。下図に示す簡単な地図を 4 色で塗り分けてみます。

なお、地図は下記文献 (276頁) より引用しました。

プログラムは次のようになります。

リスト : 地図の配色問題

(import (scheme base) (scheme write)
        (mylib list)                 ; プログラムリスト を参照
        (gauche partcont))

(define (choice . args) (shift k (for-each (lambda (x) (k x)) args)))
(define (assert pred) (shift k (if pred (k #f))))

(define (color-map)
  (define regions '(a b c d e f))
  (define adjacent '((a b c d) (b a c e) (c a b d e f)
                     (d a c f) (e b c f) (f c d e)))
  (define (get-color p ls) (cdr (assoc p ls)))
  (define (same-color? region ls)
    (let ((color (get-color region ls)))
      (find-if (lambda (x) (eq? (get-color x ls) color))
               (cdr (assoc region adjacent)))))
  ;;
  (reset
   (let ((m (map (lambda (x) (cons x (choice 'blue 'green 'red 'yellow))) regions)))
     (for-each
      (lambda (x)
        (assert (not (same-color? x m))))
      regions)
     (display m)
     (newline))))

地域と色の対応は連想リストで表します。そして、map で連想リストを作るときに、choice で色を選んでセットするところがポイントです。そして、隣り合った地域で同じ色が使われていないか関数 same-color? でチェックします。同じ色が使われていたら、バックトラックして異なる色を選び直します。最後に display で連想リストを表示します。

実行結果を示します。

gosh[r7rs.user]> (color-map)
((a . blue) (b . green) (c . red) (d . green) (e . blue) (f . yellow))

        ・・・・・省略・・・・・

((a . yellow) (b . red) (c . green) (d . red) (e . yellow) (f . blue))
#<undef>

このように、部分継続 reset / shift を使って簡単にバックトラックを実装することができます。


初版 2011 年 4 月 17 日
改訂 2020 年 10 月 11 日

Copyright (C) 2011-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | Scheme | NextPage ]