M.Hiroi's Home Page

お気楽 Scheme プログラミング入門

応用編 : コルーチン

Copyright (C) 2011-2020 Makoto Hiroi
All rights reserved.

はじめに

一般に、ジェネレータ (generator) は呼び出されるたびに新しい値を生成して返す関数のことをいいます。また、「継続と継続渡しスタイル」で作成したイテレータは、クロージャを呼び出すたびに中断していたプログラムの実行を再開し、次の要素を呼び出した側に返してプログラムの実行を中断します。この処理もジェネレータのひとつと考えることができます。

この処理をさらに一般化して、複数のプログラム間で実行の中断や再開を相互に行わせることができます。このようなプログラムのことを「コルーチン (co-routine)」といいます。

サブルーチン (sub-routine) は call してから return するまで途中で処理を中断することはできませんが、コルーチンは途中で処理を中断し、そこから実行を再開することができます。また、コルーチンを使うと複数のプログラムを (擬似的に) 並行に動作させることができます。この動作は「スレッド (thread)」とよく似ています。

一般に、スレッドは一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。コルーチンの場合、プログラムの実行は一定時間ごとに切り替わるものではなく、プログラム自身が実行を中断しないといけません。これを「ノンプリエンプティブ (nonpreemptive)」といいます。

コルーチンで複数のプログラムを並行に動作させるには、あるプログラムだけを優先的に実行するのではなく、他のプログラムが実行できるよう自主的に処理を中断する、といった協調的な動作を行わせる必要があります。

そのかわり、スレッドと違って排他制御といった面倒な処理を考える必要がなく、スレッドのような切り替え時のオーバーヘッドも少ないことから、スレッドよりも動作が軽くて扱いやすいといわれています。

最近では、コルーチンをサポートしているプログラミング言語が増えてきました。M.Hiroi が知っている言語では Lua, Ruby (Fiber : ファイバー), Julia などがあります。また、Eralng のプロセスや Go 言語 の goroutine は、プリエンプティブなマルチスレッドに近いものですが、軽量で高速に動作するといわれています。

今回は Lua や Ruby を参考に、Scheme の継続を使ってコルーチンを作成してみましょう。

●コルーチンの動作

今回作成するコルーチンには親子関係を持たせることにします。コルーチン A からコルーチン B を呼び出した場合、A が親で B が子になります。このように主従関係を持つコルーチンを「セミコルーチン (semi-coroutine)」といいます。コルーチンの親子関係は木構造と考えることができます。子のコルーチンは親または祖先のコルーチンを呼び出すことはできません。

新しいコルーチンは coroutine-create で生成します。coroutine-create は引数なしの関数を引数として受け取ります。このような関数を thunk といいます。coroutine-create はコルーチンを表すデータ (リスト) を返します。このデータを coroutine と呼ぶことにしましょう。

コルーチンを実行 (または再開) するには関数 coroutine-resume を使います。coroutine-resume には coroutine-create が返した coroutine と引数をひとつ渡します。coroutine-resume を呼び出したほうが親、呼び出されたほうが子になります。子コルーチンの中で関数 coroutine-yield を評価すると、そこでプログラムの実行を中断して親コルーチンに戻ります。

このとき、coroutine-yield の引数が親コルーチンで呼び出した coroutine-reusme の返り値になります。また、coroutine-resume の 2 番目の引数は coroutine-yeild の返り値になります。ただし、最初に coroutine-resume を呼び出したとき、まだ coroutine-yeild は呼び出されていないので、渡した引数は無視されます。

●コルーチンの作成

それでは継続を使ってコルーチンを作りましょう。次のリストを見てください。

リスト : コルーチン

(define-library (mylib coroutine)
  (import (scheme base))
  (export coroutine-initialize coroutine-create coroutine-yield coroutine-resume)
  (begin
    ;; データ型の定義
    (define-record-type Coroutine
      (make-coroutine save cont)
      coroutine?
      (save get-save set-save!)
      (cont get-cont set-cont!))

    ;; コルーチンの継続を格納
    (define *coroutine* 'root)

    ;; コルーチンの初期化
    (define (coroutine-initialize)
      (set! *coroutine* 'root))

    ;; コルーチンの生成
    (define (coroutine-create proc)
      (make-coroutine #f (lambda (x) (*coroutine* #f (proc)))))

    ;; 実行を中断して値を返す
    (define (coroutine-yield x)
      (call/cc
       (lambda (cont)
         (*coroutine* cont x))))

    ;; 実行を再開する
    (define (coroutine-resume co x)
      (cond
       ((not (coroutine? co))
        (error "no coroutine"))
       ((get-save co)
        (error "double resume"))
       ((not (get-cont co))
        (error "dead coroutine called"))
       (else
        (call-with-values
            (lambda ()
              (call/cc
               (lambda (cont)
                 (set-save! co *coroutine*)
                 (set! *coroutine* cont)
                 ((get-cont co) x))))
          (lambda (cont val)
            (set! *coroutine* (get-save co))
            (set-save! co #f)
            (set-cont! co cont)
            val)))))

    ))

最初にコルーチンを表すレコード型 Coroutine を定義します。大域変数 *coroutine* は coroutine-yield で親コルーチンに戻るための継続をセットします。この変数の使い方は「非決定性」で作成した amb と同じです。

コルーチンを生成する coroutine-create は make-coroutine でレコード型 Coroutine を生成します。フィールド変数 save は *coroutine* を退避するために使います。フィールド変数 cont が coroutine-resume で実行を再開するための継続です。最初は引数 proc を実行するラムダ式をセットします。

継続 *coroutine* が返す値は 2 つあります。最初の値は coroutine-yield のあとに実行を再開するための継続で、次の値が coroutine-resume の返り値になります。ここで、ラムダ式の *coroutine* が一番最後に評価されることに注意してください。このあと、実行するコルーチンの処理はないので、第 1 引数を #f にセットします。第 2 引数は proc の返り値になります。

コルーチンの実行を中断する coroutine-yield も簡単です。call/cc で継続 cont を取り出して、*coroutine* に cont と引数 x を渡して呼び出すだけです。

coroutine-resume はちょっとだけ複雑です。最初にエラーチェックを行います。引数 co が Coroutine 型ではない場合、co はコルーチンではないのでエラーを送出します。第 2 要素が真の場合、そのコルーチンは実行中なので double resume というエラーを送出します。第 3 要素が偽の場合、再開する継続がないので dead coroutine called というエラーを送出します。

エラーがない場合、第 3 要素の継続 (またはラムダ式) を評価して、コルーチンの実行を再開します。coroutine-yield は多値を返すので、それを call-with-values で受け取ります。まず call/cc で coroutine-resume のあとに実行する継続を引数 cont に取り出します。

次に、*coroutine* の値を co のフィールド変数 save に退避し、取り出した継続 cont を *coroutine* にセットします。これで coroutine-yield を呼び出すと、call-with-values の第 2 引数の処理が実行されて、coroutine-resume のあとの処理を再開することができます。

第 2 引数のラムダ式では引数 cont に継続を、val に coroutine-resume に返す値を受け取ります。まず最初に、退避していた *corotine* の値を元に戻して、save の値を #f にします。

それから引数 cont を co のフィールド変数 cont にセットします。次に coroutine-resume を呼び出したとき、この継続を評価することでコルーチンの実行を再開することができます。最後に val を返します。

それでは実際に試してみましょう。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (define c (coroutine-create (lambda ()
(let loop ((n 0)) (display (coroutine-yield n)) (newline) (loop (+ n 1))))))
c
gosh[r7rs.user]> (coroutine-resume c #f)
0
gosh[r7rs.user]> (coroutine-resume c 'a)
a
1
gosh[r7rs.user]> (coroutine-resume c 'b)
b
2
gosh[r7rs.user]> (coroutine-resume c 'c)
c
3
gosh[r7rs.user]> (coroutine-resume c 'd)
d
4

named-let で無限ループを作り、その中で coroutine-yield を呼び出して n の値を返します。最初に coroutine-resume を呼び出すと、コルーチンが評価されて最初の値 0 が返されます。このとき、coroutine-resume に渡された引数 #f は捨てられます。

次にcoroutine-resume を呼び出すと、引数 a が coroutine-yield の返り値となり、それが display で表示されます。そして、n の値が +1 されて coroutine-yield が呼び出されて 1 が返されます。coroutine-resume を呼び出すたびに、この処理が繰り返し行われます。

●簡単なテスト

それでは複数のコルーチンを使った簡単なテストを行ってみましょう。次のリストを見てください。

リスト : 簡単なテスト1

(define (make-coroutine code)
  (coroutine-create
    (lambda ()
      (let loop ()
        (display code)
        (coroutine-yield #f)
        (loop)))))

(define a0 (make-coroutine "h"))
(define a1 (make-coroutine "e"))
(define a2 (make-coroutine "y"))
(define a3 (make-coroutine "!"))
(define a4 (make-coroutine " "))

(define (test n)
  (let loop ((n n))
    (when 
     (positive? n)
     (for-each
       (lambda (x) (coroutine-resume x #f))
       (list a0 a1 a2 a3 a4))
     (loop (- n 1)))))

関数 make-coroutine は引数 code を表示するコルーチンを生成します。h, e, y, !, 空白を表示するコルーチンを生成し、関数 test で順番に呼び出すと、指定した回数だけ "hey! " を表示することができます。

実行例を示します。

gosh[r7rs.user]> (load "./test_co.scm")
#t
gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (test 5)
hey! hey! hey! hey! hey! #<undef>
gosh[r7rs.user]> (test 10)
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! #<undef>
gosh[r7rs.user]> (test 20)
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! 
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey! #<undef>

コルーチンの中から他のコルーチンを呼び出すこともできます。次のリストを見てください。

リスト : 簡単なテスト2

(define (make-coroutine-b code next)
  (coroutine-create
    (lambda ()
      (let loop ()
        (display code)
        (if next
            (coroutine-resume next #f))
        (coroutine-yield #f)
        (loop)))))

(define b0 (make-coroutine-b " " #f))
(define b1 (make-coroutine-b "!" b0))
(define b2 (make-coroutine-b "y" b1))
(define b3 (make-coroutine-b "e" b2))
(define b4 (make-coroutine-b "h" b3))

(define (test-b n)
  (let loop ((n n))
    (when 
     (positive? n)
     (coroutine-resume b4 #f)
     (loop (- n 1)))))

関数 make-coroutine-b は code のほかに次に実行するコルーチン next を受け取ります。コルーチンの中では、code を表示したあと next が真であれば、coroutine-resume で next の実行を再開します。

そのあと、coroutine-yield で親コルーチンに戻ります。あとはコルーチンを 5 つ生成して、関数 test-b で最後に生成したコルーチン b4 を呼び出します。実行結果はテスト1と同じになります。

●高階関数をジェネレータに変換

コルーチンを使うと高階関数をジェネレータに変換することも簡単にできます。たとえば、リストを「木」とみなして、木を巡回する高階関数 for-each-tree を考えてみましょう。for-each-tree は次のように簡単にプログラムできます。

リスト : 木の巡回

(define (for-each-tree fn tree)
  (cond 
   ((null? tree) #f)
   ((pair? tree)
    (for-each-tree fn (car tree))
    (for-each-tree fn (cdr tree)))
   (else 
    (fn tree))))

簡単な実行例を示します。

gosh[r7rs.user]> (for-each-tree (lambda (x) (display x) (newline)) 
'(a (b (c d e) f) g))
a
b
c
d
e
f
g
#f

このような高階関数をジェネレータに変換する場合もコルーチンを使うと簡単にできます。次のリストを見てください。

リスト : 高階関数からジェネレータを生成

(define (make-generator fn . args)
  (coroutine-create
    (lambda ()
      (apply fn (lambda (x) (coroutine-yield x)) args))))

引数 fn は高階関数、そのあとに fn に渡す引数を可変個引数で受け取ります。なお、関数 fn は第 1 引数に関数を受け取るものとします。coroutine-create に渡すラムダ式の中で関数 fn を apply で呼び出します。このとき、第 1 引数にラムダ式を渡して、その中で coroutine-yield を実行します。これで fn が評価されて、第 1 引数で渡したラムダ式が呼び出されると、coroutine-yield により引数 x を coroutine-resume に返して実行が中断されます。

それでは実行してみましょう。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (define c (make-generator for-each-tree '(a (b (c d e) f) g)))
c
gosh[r7rs.user]> (coroutine-resume c #f)
a
gosh[r7rs.user]> (coroutine-resume c #f)
b
gosh[r7rs.user]> (coroutine-resume c #f)
c
gosh[r7rs.user]> (coroutine-resume c #f)
d
gosh[r7rs.user]> (coroutine-resume c #f)
e
gosh[r7rs.user]> (coroutine-resume c #f)
f
gosh[r7rs.user]> (coroutine-resume c #f)
g
gosh[r7rs.user]> (coroutine-resume c #f)
#f
gosh[r7rs.user]> (coroutine-resume c #f)
*** ERROR: dead coroutine called

coroutine-resume を呼び出すたびに、木の要素を順番に取り出して返します。要素がなくなると for-each-tree の返り値 #f を返します。そのあと、coroutine-resume を呼び出すとエラーが送出されます。

●順列の生成

順列を生成するジェネレータは make-generator を使わなくても、コルーチンで直接プログラムすることができます。次のリストを見てください。

リスト : 順列の生成

(define (gen-perm ls m)
  (coroutine-create
    (lambda ()
      (if (zero? m)
          (coroutine-yield '())
          (let ((gen (gen-perm ls (- m 1))))
            (let loop ((x (coroutine-resume gen #f)))
              (cond (x
                     (for-each
                      (lambda (y)
                        (if (not (member y x))
                            (coroutine-yield (append x (list y)))))
                      ls)
                     (loop (coroutine-resume gen #f)))))))
      #f)))

関数 gen-perm は順列を生成するコルーチンを返します。引数 ls が選択する要素を格納したリスト、m が選ぶ個数です。m が 0 の場合、要素の選択が終わったので coroutine-yield で空リストを返します。そうでなければ、gen-perm を呼び出して新しいコルーチン gen を生成します。

次に named-let でその要素 (順列を格納したリスト) を取り出して x にセットし、それに含まれていない要素 y を選びます。あとは coroutine-yield で y を追加したリストを返します。これで順列を生成することができます。

簡単な実行例を示します。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (define c (gen-perm '(a b c) 3))
c
gosh[r7rs.user]> (coroutine-resume c #f)
(a b c)
gosh[r7rs.user]> (coroutine-resume c #f)
(a c b)
gosh[r7rs.user]> (coroutine-resume c #f)
(b a c)
gosh[r7rs.user]> (coroutine-resume c #f)
(b c a)
gosh[r7rs.user]> (coroutine-resume c #f)
(c a b)
gosh[r7rs.user]> (coroutine-resume c #f)
(c b a)
gosh[r7rs.user]> (coroutine-resume c #f)
#f

●エラトステネスの篩

最後にコルーチンを使って素数を求めるプログラムを作ってみましょう。考え方は簡単です。最初に、2 から始まる整数列を生成するコルーチンを用意します。この場合、コルーチンを「遅延ストリーム」として使います。2 は素数なので、この整数列から 2 で割り切れる整数を取り除き除きます。ここでもコルーチンを使って、入力ストリームから 2 で割り切れる整数を取り除いたストリームを返すフィルターを作ります。

2 で割り切れる整数が取り除かれたので、次の要素は 3 になります。今度は 3 で割り切れる整数を取り除けばいいのです。これもフィルターを使えば簡単です。このとき、入力用のストリームは 2 で割り切れる整数が取り除かれています。したがって、このストリームに対して 3 で割り切れる整数を取り除くようにフィルターを設定すればいいわけです。

このように、素数を見つけたらそれで割り切れる整数を取り除いていくアルゴリズムを「エラトステネスの篩」といいます。ようするに、2 から始まる整数ストリームに対して、見つけた素数 2, 3, 5, 7, 11, ... を順番にフィルターで設定して素数でない整数をふるい落としていくわけです。

プログラムは次のようになります。

リスト : エラトステネスの篩

;;; n から始まる整数列
(define (integers n)
  (coroutine-create
   (lambda ()
     (let loop ((n n))
       (coroutine-yield n)
       (loop (+ n 1))))))

;;; フィルター
(define (stream-filter pred co)
  (coroutine-create
   (lambda ()
     (let loop ()
       (let ((x (coroutine-resume co #f)))
         (when (pred x)
           (coroutine-yield x))
         (loop))))))

;;; n 個の素数を求める
(define (sieve n)
  (let ((nums (integers 2)))
    (let loop ((n n))
      (when
       (positive? n)
       (let ((x (coroutine-resume nums #f)))
         (display x) (display " ")
         (set! nums
               (stream-filter (lambda (y) (not (zero? (modulo y x)))) nums))
         (loop (- n 1)))))))

関数 integers は n から始まる整数列を生成するストリームです。このような遅延ストリームはコルーチンを使って簡単に作ることができます。関数 filter は述語 pred が偽を返す要素をコルーチン co から取り除きます。co から要素を取り出して x にセットします。pred(x) が真であれば coroutine-yield で親コルーチンに x を返します。これで述語が偽を返す要素を取り除くことができます。

素数を求める関数 sieve も簡単です。引数 n は求める素数の個数です。最初に、2 から始まる整数列を integers で生成して変数 nums に セットします。このストリーム nums の先頭要素が素数になります。cotoutine-resume でストリームから素数を取り出して x にセットします。次に x を表示して、x で割り切れる整数を取り除くフィルターを生成して nums にセットします。つまり、n 個の素数を求めるために、n 個のフィルターをストリームに重ねていくわけです。

それでは実際に (sieve 100) を実行してみましょう。

gosh[r7rs.user]> (coroutine-initialize)
root
gosh[r7rs.user]> (sieve 100)
2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 
101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 
193 197 199 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 
293 307 311 313 317 331 337 347 349 353 359 367 373 379 383 389 397 401 
409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499 503 509 
521 523 541 #<undef>

正常に動作していますね。


初版 2011 年 4 月 17 日
改訂 2020 年 10 月 11 日