M.Hiroi's Home Page

Functional Programming

お気楽 Scheme プログラミング入門

[ PrevPage | Scheme | NextPage ]

遅延ストリーム (3)

前回まで作成した遅延ストリームは、stream-cons で遅延ストリームを生成するとき、ストリームの要素となる引数を評価していました。たとえば、(stream-cons (func x) ...) とすると、(func x) を評価した値がストリームの要素となります。ここで、ストリームにまだアクセスしていないのに、(func x) が評価されていることに注意してください。もし、(func x) がデータの入力処理だとすると、遅延ストリームを生成するときにデータをひとつ先読みしてしまうことになります。

そこで、コンスセルの CAR 部と CDR 部をまとめて遅延評価することにします。この場合、プロミスが遅延ストリームを表すことになります。今回はこの方法で遅延ストリームを作ってみましょう。

●遅延ストリームをプロミスで表す

遅延ストリームをプロミスで表す場合、その構造はつぎのようになります。

リスト : 遅延ストリーム

(define-library (mylib lazystream1)
  (import (scheme base) (scheme lazy) (scheme case-lambda))
  (export ・・・省略・・・ )
  (begin
    ;; 遅延ストリームの基本関数
    (define-syntax stream-cons
      (syntax-rules ()
        ((_ a b) (delay (cons a b)))))

    ;; 先頭要素を取り出す
    (define (stream-car s) (car (force s)))

    ;; 先頭要素を取り除く
    (define (stream-cdr s) (cdr (force s)))

    ;; ストリームの終端
    (define nil (delay '()))
    (define (stream-empty? s) (null? (force s)))
    (define (stream-empty) nil)

    ・・・省略・・・

  ))

stream-cons は (cons a (delay b)) ではなく (delay (cons a b)) とします。これで stream-cons の引数 a, b が遅延評価されます。stream-car と stream-cdr は遅延ストリーム s を force で評価してから car と cdr を適用します。遅延ストリームはプロミスで表すので、終端 nil の定義は (delay '()) とします。stream-empty? も遅延ストリーム s を force で評価してから null? でチェックします。

●stream-delay

ここで stream-empty? を評価すると、遅延ストリームが force されることに注意してください。たとえば、遅延ストリームを連結する stream-append を次のように定義すると問題が発生します。

リスト : 遅延ストリームの連結 (間違い版)

;; 整数列の生成
(define (stream-range low high)
  (if (> low high)
      nil
      (stream-cons low (stream-range (+ 1 low) high))))

;; 遅延ストリームの連結 (間違い版)
(define (stream-append-bad s1 s2)
  (if (stream-empty? s1)
      s2
      (stream-cons (stream-car s1)
                   (stream-append-bad (stream-cdr s1) s2))))

stream-append-bad でストリームを生成するとき、empty? で s1 が force されることになります。つまり、新しいストリームを生成する前に引数のストリームが評価されてしまうのです。次の例を見てください。

gosh[r7rs.user]> (define s1 (range 1 4))
s1
gosh[r7rs.user]> (define s2 (range 5 8))
s2
gosh[r7rs.user]> (define s3 (stream-append-bad s1 s2))
s3
gosh[r7rs.user]> s1
#<promise ... (forced)>
gosh[r7rs.user]> s2
#<promise ...>
gosh[r7rs.user]> s3
#<promise ...>

s1 と s2 を連結した新しいストリーム s3 を評価していないにもかかわらず、引数のストリーム s1 が force されていることがわかります。この場合、stream-append の本体を delay と force で囲みます。

リスト : 遅延ストリームの連結 (修正版)

(define (stream-append s1 s2)
  (delay
   (force
    (if (stream-empty? s1)
        s2
        (stream-cons (stream-car s1)
                     (stream-append (stream-cdr s1) s2))))))

delay と force で囲むのは無駄なように思いますが、これにより stream-append を評価して遅延ストリームを生成するとき、引数 s1 の遅延ストリームが force されずにすむわけです。

実際には、次に示すようなマクロを定義すると簡単です。

リスト : 式 expr の遅延ストリームを返す

(define-syntax stream-delay
  (syntax-rules ()
    ((_ expr) (delay (force expr)))))
リスト : 遅延ストリームの連結 (完成版)

(define (stream-append s1 s2)
  (stream-delay
    (if (stream-empty? s1)
        s2
        (stream-cons (stream-car s1)
                     (stream-append (stream-cdr s1) s2)))))

簡単な実行例を示します。

gosh[r7rs.user]> (define s4 (stream-range 1 4))
s4
gosh[r7rs.user]> (define s5 (stream-range 5 8))
s5
gosh[r7rs.user]> (define s6 (stream-append s4 s5))
s6
gosh[r7rs.user]> s4
#<promise ...>
gosh[r7rs.user]> s5
#<promise ...>
gosh[r7rs.user]> s6
#<promise ...>
gosh[r7rs.user]> (stream-car s6)
1
gosh[r7rs.user]> (stream-car (stream-cdr s6))
2
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr s6)))
3
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr s6))))
4
gosh[r7rs.user]> (stream-car (stream-cdr (stream-cdr (stream-cdr (stream-cdr s6)))))
5

このように、stream-delay を使うことで、新しい遅延ストリームを生成するとき、引数のストリームが force されるのを防止することができます。

同様に stream-map や stream-filter など、遅延ストリームを受け取って新しい遅延ストリームを返す関数は stream-delay で囲む必要があります。詳細は プログラムリスト をお読みください。

●実行速度の比較

それでは簡単な実行例として、素数を求めるプログラムで実行速度を比較してみましょう。

リスト : 素数を求める

(define *primes*
  (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

(define (primes-from n)
  (if (prime? n)
      (stream-cons n (primes-from (+ n 2)))
      (primes-from (+ n 2))))

(define (prime? n)
  (let loop ((s (stream-cdr *primes*)))
    (let ((p (stream-car s)))
      (cond
       ((> (* p p) n) #t)
       ((zero? (modulo n p)) #f)
       (else
        (loop (stream-cdr s)))))))
リスト : 簡単なテスト

(define (test ps n)
  (let ((s (current-jiffy)))
    (display (stream-ref ps n))
    (newline)
    (inexact (/ (- (current-jiffy) s) (jiffies-per-second)))))

素数列 primes の定義は 遅延ストリーム (1) で作成したものと同じです。(stream-ref primes 20000) の実行時間を求めたところ、結果は次のようになりました。

lazystream.scm  : 0.29 秒
lazystream1.scm : 0.40 秒

実行環境 : Ubunts 18.04 (Windows Subsystem for Linux), Intel Core i5-6200U 2.30GHz, Gauche ver 0.9.9

今回の遅延ストリームのほうが少し遅くなりました。興味のある方はいろいろ試してみてください。

●問題点

今回は単純に delay と force を使いましたが、この方法では末尾再帰的なアルゴリズムとの相性がよくないことがわかっているそうです。詳しい説明は Gauche のリファレンス 遅延評価 や Yutaka Hara さんの R7RSのdelay-forceとは何か をお読みください。


●プログラムリスト

;;;
;;; lazystream1.scm : 遅延ストリーム (CAR 部も遅延評価する)
;;;
;;;                   Copyright (C) 2017-2020 Makoto Hiroi
;;;
(define-library (mylib lazystream1)
  (import (scheme base) (scheme lazy) (scheme case-lambda))
  (export stream-cons stream-car stream-cdr stream-empty? stream-empty stream-delay
          stream-range stream-unfold list->stream stream->list stream-ref
          stream-take stream-drop stream-append stream-append-delay interleave
          stream-map stream-flatmap stream-filter stream-foldl stream-foldr
          stream-scanl stream-for-each stream-take-while stream-drop-while
          stream-merge stream-union stream-intersect stream-perm
          sieve *primes*
   )
  (begin
    ;; 遅延ストリームの基本関数
    (define-syntax stream-cons
      (syntax-rules ()
        ((_ a b) (delay (cons a b)))))

    ;; 先頭要素を取り出す
    (define (stream-car s) (car (force s)))

    ;; 先頭要素を取り除く
    (define (stream-cdr s) (cdr (force s)))

    ;; ストリームの終端
    (define nil (delay '()))
    (define (stream-empty? s) (null? (force s)))
    (define (stream-empty) nil)

    ;; 式 expr の遅延ストリームを返す
    (define-syntax stream-delay
      (syntax-rules ()
        ((_ expr) (delay (force expr)))))

    ;;
    ;; ストリームの生成
    ;;

    ;; 数列を生成するストリーム
    (define (stream-range low high)
      (if (> low high)
          nil
          (stream-cons low (stream-range (+ 1 low) high))))

    ;; 逆畳み込み
    (define stream-unfold
      (case-lambda
       ((iterate seed)
        (stream-unfold iterate seed (lambda (x) #f)))
       ((iterate seed pred)
        (if (pred seed)
            nil
            (stream-cons seed
                         (stream-unfold iterate (iterate seed) pred))))))

    ;; リストとストリームの変換
    (define (list->stream xs)
      (if (null? xs)
          nil
          (stream-cons (car xs) (list->stream (cdr xs)))))

    (define (stream->list s)
      (if (stream-empty? s)
          '()
          (cons (stream-car s) (stream->list (stream-cdr s)))))

    ;; 基本的な操作関数
    (define (stream-ref s n)
      (if (zero? n)
          (stream-car s)
          (stream-ref (stream-cdr s) (- n 1))))

    (define (stream-take s n)
      (stream-delay
       (if (or (stream-empty? s) (zero? n))
           nil
           (stream-cons (stream-car s)
                        (stream-take (stream-cdr s) (- n 1))))))

    (define (stream-drop s n)
      (stream-delay
       (if (or (stream-empty? s) (zero? n))
           s
           (stream-drop (stream-cdr s) (- n 1)))))

    ;; 遅延ストリームの連結
    (define (stream-append s1 s2)
      (stream-delay
       (if (stream-empty? s1)
           s2
           (stream-cons (stream-car s1)
                        (stream-append (stream-cdr s1) s2)))))

    ;; 遅延評価版
    (define (stream-append-delay s1 s2)
      (stream-delay
       (if (stream-empty? s1)
           (force s2)
           (stream-cons (stream-car s1)
                        (stream-append-delay (stream-cdr s1) s2)))))

    ;; ストリームの要素を交互に出力
    (define (interleave s1 s2)
      (stream-delay
       (if (stream-empty? s1)
           s2
           (stream-cons (stream-car s1)
                        (interleave s2 (stream-cdr s1))))))

    ;;
    ;; 高階関数
    ;;

    ;; 簡易版
    (define (any pred xs)
      (if (null? xs)
          #f
          (or (pred (car xs)) (any pred (cdr xs)))))

    ;; マッピング
    (define (map-1 proc s)
      (stream-delay
       (if (stream-empty? s)
           nil
           (stream-cons (proc (stream-car s))
                        (map-1 proc (stream-cdr s))))))

    (define (map-n proc ss)
      (stream-delay
       (if (any stream-empty? ss)
           nil
           (stream-cons (apply proc (map stream-car ss))
                        (map-n proc (map stream-cdr ss))))))

    (define stream-map
      (case-lambda
       ((proc s)
        (map-1 proc s))
       ((proc s . args)
        (map-n proc (cons s args)))))

    ;; マッピングの結果を平坦化する
    (define (stream-flatmap proc s)
      (stream-delay
       (if (stream-empty? s)
           nil
           (stream-append-delay (proc (stream-car s))
                                (delay (stream-flatmap proc (stream-cdr s)))))))

    ;; フィルター
    (define (stream-filter pred s)
      (stream-delay
       (cond
        ((stream-empty? s) nil)
        ((pred (stream-car s))
         (stream-cons (stream-car s)
                      (stream-filter pred (stream-cdr s))))
        (else
         (stream-filter pred (stream-cdr s))))))

    ;; 畳み込み
    (define (foldl-1 proc a s)
      (if (stream-empty? s)
          a
          (foldl-1 proc (proc a (stream-car s)) (stream-cdr s))))

    (define (foldl-n proc a ss)
      (if (any stream-empty? ss)
          a
          (foldl-n proc
                   (apply proc a (map stream-car ss))
                   (map stream-cdr ss))))

    (define stream-foldl
      (case-lambda
       ((proc a s)
        (foldl-1 proc a s))
       ((proc a s . args)
        (foldl-n proc a (cons s args)))))

    (define (foldr-1 proc a s)
      (if (stream-empty? s)
          a
          (proc (foldr-1 proc a (stream-cdr s)) (stream-car s))))

    (define (foldr-n proc a ss)
      (if (any stream-empty? ss)
          a
          (apply proc
                 (foldr-n proc a (map stream-cdr ss))
                 (map stream-car ss))))

    (define stream-foldr
      (case-lambda
       ((proc a s)
        (foldr-1 proc a s))
       ((proc a s . args)
        (foldr-n proc a (cons s args)))))

    (define (scanl-1 proc a s)
      (if (stream-empty? s)
          (stream-cons a nil)
          (stream-cons a (scanl-1 proc (proc a (stream-car s)) (stream-cdr s)))))

    (define (scanl-n proc a ss)
      (if (any stream-empty? ss)
          (stream-cons a nil)
          (stream-cons a (scanl-n proc
                                  (apply proc a (map stream-car ss))
                                  (map stream-cdr ss)))))

    (define stream-scanl
      (case-lambda
       ((proc a s)
        (scanl-1 proc a s))
       ((proc a s . args)
        (scanl-n proc a (cons s args)))))

    ;; 巡回
    (define (for-each-1 proc s)
      (unless
       (stream-empty? s)
       (proc (stream-car s))
       (for-each-1 proc (stream-cdr s))))

    (define (for-each-n proc ss)
      (unless
       (any stream-empty? ss)
       (apply proc (map stream-car ss))
       (for-each-n proc (map stream-cdr ss))))

    (define stream-for-each
      (case-lambda
       ((proc s)
        (for-each-1 proc s))
       ((proc s . args)
        (for-each-n proc (cons s args)))))

    (define (stream-take-while pred s)
      (if (or (stream-empty? s) (not (pred (stream-car s))))
          nil
          (stream-cons (stream-car s)
                       (stream-take-while pred (stream-cdr s)))))

    (define (stream-drop-while pred s)
      (if (or (stream-empty? s) (not (pred (stream-car s))))
          s
          (stream-drop-while pred (stream-cdr s))))

    ;; 遅延ストリームの併合
    (define (stream-merge s1 s2)
      (stream-delay
       (cond
        ((stream-empty? s1) s2)
        ((stream-empty? s2) s1)
        ((<= (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s1) (stream-merge (stream-cdr s1) s2)))
        (else
         (stream-cons (stream-car s2) (stream-merge s1 (stream-cdr s2)))))))

    ;; 和集合
    (define (stream-union s1 s2)
      (stream-delay
       (cond
        ((stream-empty? s1) s2)
        ((stream-empty? s2) s1)
        ((< (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s1)
                      (stream-union (stream-cdr s1) s2)))
        ((> (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s2)
                      (stream-union s1 (stream-cdr s2))))
        (else
         (stream-cons (stream-car s1)
                      (stream-union (stream-cdr s1) (stream-cdr s2)))))))

    ;; 積集合
    (define (stream-intersect s1 s2)
      (stream-delay
       (cond
        ((or (stream-empty? s1) (stream-empty? s2)) nil)
        ((= (stream-car s1) (stream-car s2))
         (stream-cons (stream-car s1)
                      (stream-intersect (stream-cdr s1) (stream-cdr s2))))
        ((< (stream-car s1) (stream-car s2))
         (stream-intersect (stream-cdr s1) s2))
        (else
         (stream-intersect s1 (stream-cdr s2))))))

    ;; 順列の生成
    (define (stream-perm n s)
      (stream-delay
       (if (zero? n)
           (stream-cons '() nil)
           (stream-flatmap
            (lambda (x)
              (stream-map (lambda (y) (cons x y))
                          (stream-perm
                           (- n 1)
                           (stream-filter (lambda (z) (not (eqv? x z))) s))))
            s))))

    ;;
    ;; 素数
    ;;
    (define (sieve s)
      (stream-cons (stream-car s)
                   (sieve (stream-filter
                           (lambda (x) (not (zero? (modulo x (stream-car s)))))
                           (stream-cdr s)))))

    ;; 高速化
    (define *primes*
      (stream-cons 2 (stream-cons 3 (stream-cons 5 (primes-from 7)))))

    (define (primes-from n)
      (if (prime? n)
          (stream-cons n (primes-from (+ n 2)))
          (primes-from (+ n 2))))

    (define (prime? n)
      (let loop ((s (stream-cdr *primes*)))
        (let ((p (stream-car s)))
          (cond
           ((> (* p p) n) #t)
           ((zero? (modulo n p)) #f)
           (else
            (loop (stream-cdr s)))))))

    ))

●簡単な実行例

gosh[r7rs.user]> (import (mylib lazystream1))
gosh[r7rs.user]> (define s1 (stream-unfold (lambda (x) (+ x 1)) 1))
s1
gosh[r7rs.user]> (stream->list (stream-take s1 20))
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20)
gosh[r7rs.user]> (stream->list (stream-take (stream-map (lambda (x) (* x x)) s1) 20))
(1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400)
gosh[r7rs.user]> (stream->list (stream-take (stream-filter even? s1) 20))
(2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40)
gosh[r7rs.user]> (stream->list (stream-take (stream-filter odd? s1) 20))
(1 3 5 7 9 11 13 15 17 19 21 23 25 27 29 31 33 35 37 39)

gosh[r7rs.user]> (define (stream-add s1 s2) (stream-map + s1 s2))
stream-add
gosh[r7rs.user]> (define fibo (stream-cons 0 (stream-cons 1 (stream-add (stream-cdr fibo) fibo))))
fibo
gosh[r7rs.user]> (stream->list (stream-take fibo 30))
(0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765 10946
 17711 28657 46368 75025 121393 196418 317811 514229)

gosh[r7rs.user]> (define hs (stream-cons 1
 (stream-union (stream-map (lambda (x) (* x 2)) hs)
 (stream-union (stream-map (lambda (x) (* x 3)) hs)
 (stream-map (lambda (x) (* x 5)) hs)))))
hs
gosh[r7rs.user]> (stream-for-each (lambda (x) (display x) (display " ")) (stream-take hs 100))
1 2 3 4 5 6 8 9 10 12 15 16 18 20 24 25 27 30 32 36 40 45 48 50 54 60 64 72 75 80 81 90 96 100 
108 120 125 128 135 144 150 160 162 180 192 200 216 225 240 243 250 256 270 288 300 320 324 360 
375 384 400 405 432 450 480 486 500 512 540 576 600 625 640 648 675 720 729 750 768 800 810 864 
900 960 972 1000 1024 1080 1125 1152 1200 1215 1250 1280 1296 1350 1440 1458 1500 1536 #<undef>

gosh[r7rs.user]> (stream->list (stream-perm 4 (stream-range 1 4)))
((1 2 3 4) (1 2 4 3) (1 3 2 4) (1 3 4 2) (1 4 2 3) (1 4 3 2) (2 1 3 4)
 (2 1 4 3) (2 3 1 4) (2 3 4 1) (2 4 1 3) (2 4 3 1) (3 1 2 4) (3 1 4 2)
 (3 2 1 4) (3 2 4 1) (3 4 1 2) (3 4 2 1) (4 1 2 3) (4 1 3 2) (4 2 1 3)
 (4 2 3 1) (4 3 1 2) (4 3 2 1))
gosh[r7rs.user]> (stream->list (stream-take (stream-perm 8 (stream-range 1 8)) 10))
((1 2 3 4 5 6 7 8) (1 2 3 4 5 6 8 7) (1 2 3 4 5 7 6 8) (1 2 3 4 5 7 8 6)
 (1 2 3 4 5 8 6 7) (1 2 3 4 5 8 7 6) (1 2 3 4 6 5 7 8) (1 2 3 4 6 5 8 7)
 (1 2 3 4 6 7 5 8) (1 2 3 4 6 7 8 5))

gosh[r7rs.user]> (define (stream-zip s1 s2) (stream-map list s1 s2))
stream-zip
gosh[r7rs.user]> (stream->list (stream-take (stream-zip s1 s1) 10))
((1 1) (2 2) (3 3) (4 4) (5 5) (6 6) (7 7) (8 8) (9 9) (10 10))
gosh[r7rs.user]> (define *twins* (stream-filter (lambda (xs) (= (- (cadr xs) (car xs)) 2))
(stream-zip *primes* (stream-cdr *primes*))))
*twins*
gosh[r7rs.user]> (stream->list (stream-take *twins* 10))
((3 5) (5 7) (11 13) (17 19) (29 31) (41 43) (59 61) (71 73) (101 103)
 (107 109))
gosh[r7rs.user]> (stream-for-each display (stream-take *twins* 100))
(3 5)(5 7)(11 13)(17 19)(29 31)(41 43)(59 61)(71 73)(101 103)(107 109)(137 139)(149 151)
(179 181)(191 193)(197 199)(227 229)(239 241)(269 271)(281 283)(311 313)(347 349)(419 421)
(431 433)(461 463)(521 523)(569 571)(599 601)(617 619)(641 643)(659 661)(809 811)(821 823)
(827 829)(857 859)(881 883)(1019 1021)(1031 1033)(1049 1051)(1061 1063)(1091 1093)(1151 1153)
(1229 1231)(1277 1279)(1289 1291)(1301 1303)(1319 1321)(1427 1429)(1451 1453)(1481 1483)
(1487 1489)(1607 1609)(1619 1621)(1667 1669)(1697 1699)(1721 1723)(1787 1789)(1871 1873)
(1877 1879)(1931 1933)(1949 1951)(1997 1999)(2027 2029)(2081 2083)(2087 2089)(2111 2113)
(2129 2131)(2141 2143)(2237 2239)(2267 2269)(2309 2311)(2339 2341)(2381 2383)(2549 2551)
(2591 2593)(2657 2659)(2687 2689)(2711 2713)(2729 2731)(2789 2791)(2801 2803)(2969 2971)
(2999 3001)(3119 3121)(3167 3169)(3251 3253)(3257 3259)(3299 3301)(3329 3331)(3359 3361)
(3371 3373)(3389 3391)(3461 3463)(3467 3469)(3527 3529)(3539 3541)(3557 3559)(3581 3583)
(3671 3673)(3767 3769)(3821 3823)#<undef>

初版 2017 年 2 月 17 日
改訂 2020 年 10 月 11 日

非決定性

リストの中から要素をひとつ選ぶ処理を考えます。たとえば、(list-ref ls n) はリスト ls の n 番目の要素を取り出しますが、選ぶ要素を引数 n で指定する必要があります。これに対して、特別な指定をしないで無作為に要素を選ぶことを考えます。このような選択を「非決定的選択」といいます。

ここで、非決定的選択は問題を解くのに都合のいい選択が行われると仮定します。つまり、複数の選択肢の中で解に導くものがいくつか存在するならば、そのうちの一つを選択するのです。たとえば、迷路で分かれ道にきた場合、その中から出口につながる道を一つ選ぶわけです。このような非決定的選択を含む処理 (計算) を「非決定性計算」とか「非決定性」といいます。

このような都合のいい処理を現在のコンピュータで実現することは不可能ですが、バックトラックを使って近似的に実現することは可能です。つまり、ある要素を選んで条件を満たさない場合は、バックトラックして異なる要素を選択すればいいわけです。今回は 独習 Scheme 三週間 Chapter 14 非決定性 を参考に、非決定性計算を行う関数 amb を作ってみましょう。

●amb の動作

関数 amb は 0 個以上の引数を受け取り、その中から一つを選んで返します。次の例を見てください。

(amb 1 2 3) => 1, 2, 3 のどれか 1 つを返す
(amb)       => バックトラックして残りの 2 つのうちの 1 つを返す
(amb)       => バックトラックして最後の 1 つを返す
(amb)       => これ以上バックトラックできないのでエラー

amb は 1 個以上の引数が与えられた場合、その中の 1 つを選んで返します。引数がない場合、バックトラックして次の要素を選びます。今回は先頭から順番に引数を選んでいくことにしましょう。

amb は要素を選ぶだけの単純な動作ですが、複数の amb を組み合わせると複雑な動作が可能になります。リスト (1 2 3) と (4 5 6) から要素を一つずつ取り出して、その組を求める処理は次のようになります。

(list (amb 1 2 3) (amb 4 5 6)) => (1 4)
(amb) => (1 5)
(amb) => (1 6)
(amb) => (2 4)
(amb) => (2 5)
(amb) => (2 6)
(amb) => (3 4)
(amb) => (3 5)
(amb) => (3 6)
(amb) => エラー

最初の amb で 1 を選び、次の amb で 4 が選ばれるので、最初の値は (1 4) になります。次に、amb を評価すると、2 番目の amb がバックトラックして、次の要素 5 を選びます。したがって、返り値は (1 5) になります。そして、その次の返り値は (1 6) になります。

2 番目の amb で要素がなくなると、最初の amb にバックトラックします。すると、次の要素 2 を選び、2 番目の amb を評価します。ここで 2 番目の amb は新しく評価されることに注意してください。引数 4, 5, 6 を順番に選んでいくので、返り値は (2 4) になります。あとはバックトラックするたびに組が生成され、全ての組み合わせを求めることができます。

ただし、amb を関数として定義すると、次の場合は正常に動作しません。

(amb (amb) 1) => 1 を返すはずがエラーになる

この場合、引数の (amb) が失敗しても次の要素 1 を選ぶはずなのですが、Scheme の関数は先に引数 (amb) を評価するのでエラーになるのです。したがって、amb はマクロで定義する必要があります。

●関数版 amb の作成

いきなりマクロを作るのは大変なので、まず最初に関数版 amb から作りましょう。プログラムは次のようになります。なお、今回のプログラムは拙作のライブラリ (mylib list) を使います。

リスト : 非決定性 amb (関数版)

(import (scheme base) (mylib list))

;;; バックトラックするときの継続を格納する
(define *amb-fail* #f)

;;; 初期化
(define (initialize-amb-fail)
  (set! *amb-fail*
        (lambda () (error "amb tree exhausted"))))

;;; 非決定性 amb (関数版)
(define (amb . args)
  (if (null? args)
      (*amb-fail*)
      (let ((prev-fail *amb-fail*))
        (call/cc
         (lambda (cont-s)
           (for-each
            (lambda (x)
              (call/cc
               (lambda (cont-f)
                 (set! *amb-fail*
                       (lambda ()
                         (set! *amb-fail* prev-fail)
                         (cont-f #f)))
                 (cont-s x))))
            args)
           (prev-fail))))))

*amb-fail* はバックトラックするときの継続を格納します。関数 initialize-amb-fail は *amb-fail* を初期化します。これは error でエラー "amb tree exhausted" を送出するだけです。関数 amb は引数のリスト args の要素を先頭から順番に取り出していきます。この処理は拙作のページ 継続と継続渡しスタイル で作成したイテレータとよく似ています。

最初に args が空リストかチェックします。そうであれば、*amb-fail* に格納されている継続を実行します。引数がある場合は、先頭から順番に取り出していきます。まず、*amb-fail* に格納されている継続を局所変数 prev-fail に保存します。次に、call/cc で要素を返すための継続を取り出して cont-s に渡します。そして、for-each で args の要素を順番にアクセスします。

ラムダ式の中でバックトラックするときの継続を取り出して cont-f に渡します。そして、*amb-fail* に cont-f を呼び出す処理をセットします。この処理の中で、prev-fail に保存しておいた継続を *amb-fail* に戻してから、cont-f を呼び出してバックトラックするようにします。最後に (cont-s x) で要素 x を返します。

これで、(amb) でバックトラックすると for-each の処理に戻るので、要素を一つずつ取り出すことができます。for-each が終了したら prev-fail を呼び出すことに注意してください。これで、以前に実行した amb にバックトラックすることができます。なお、このときの prev-fail と *amb-fail* は同じ値なので、(*amb-fail*) を評価してもかまいません。

それでは簡単な実行例を示しましょう。

gosh[r7rs.user]> (initialize-amb-fail)
#<closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (amb 1 2 3)
1
gosh[r7rs.user]> (amb)
2
gosh[r7rs.user]> (amb)
3
gosh[r7rs.user]> (amb)
*** ERROR: amb tree exhausted

gosh[r7rs.user]> (list (amb 1 2) (amb 3 4))
(1 3)
gosh[r7rs.user]> (amb)
(1 4)
gosh[r7rs.user]> (amb)
(2 3)
gosh[r7rs.user]> (amb)
(2 4)
gosh[r7rs.user]> (amb)
*** ERROR: amb tree exhausted

もう一つ簡単な例として、順列を生成するプログラムを作ってみましょう。次のリストを見てください。

リスト : 順列の生成

;;; 条件 pred を満たさない場合はバックトラックする
(define (assert pred)
  (if (not pred) (amb)))

;;; ls から n 個を取り出す順列
(define (perm n ls)
  (let loop ((n n) (a '()))
    (if (zero? n)
        (reverse a)
        (let ((x (apply amb ls)))
          (assert (not (member x a)))
          (loop (- n 1) (cons x a))))))

assert は pred が偽の場合は (amb) を評価してバックトラックします。amb を使うと順列を生成する関数 perm は簡単に実現できます。amb でリストの要素を 1 つ選び、それが順列 a に含まれていないことを assert で確認します。同じ要素が含まれていれば、バックトラックして異なる要素を選びます。n 個の要素を選んだら reverse でリスト a を反転した値を返します。

それでは実行例を示します。

gosh[r7rs.user]> (initialize-amb-fail)
#<closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (perm 4 '(a b c d))
(a b c d)
gosh[r7rs.user]> (amb)
(a b d c)
gosh[r7rs.user]> (amb)
(a c b d)
gosh[r7rs.user]> (amb)
(a c d b)
gosh[r7rs.user]> (amb)
(a d b c)
gosh[r7rs.user]> (amb)
(a d c b)

このように、バックトラックするたびに順列を一つずつ生成することができます。

●解をすべて求める

非決定性のプログラムはバックトラックすることで全ての解を求めることができます。このとき、見つけた解をリストに格納して返す関数があると便利です。次のリストを見てください。

リスト : 見つけた解をリストに格納して返す

(define (bag-of func)
  (let ((prev-fail *amb-fail*)
        (result '()))
    (if (call/cc
         (lambda (cont)
           (set! *amb-fail* (lambda () (cont #f)))
           (set! result (cons (func) result))
           (cont #t)))
        (*amb-fail*))
    (set! *amb-fail* prev-fail)
    (reverse! result)))

関数 bag-of は引数 func を実行して、その結果をリストに格納して返します。func は非決定性計算を行う関数です。最初に *amb-fail* を局所変数 prev-fail に保存します。func の返り値は局所変数 result に格納します。次に、call/cc で脱出先の継続 cont を取り出して、*amb-fail* に (lambda () (cont #f)) をセットします。そして、関数 func を評価して、その返り値を result の先頭に追加します。

(cont #t) を評価すると、call/cc の返り値が #t となり、if の then 節が評価されるので、(*amb-fail*) が実行されます。func の処理にバックトラックして、解が見つかればその値を返します。つまり、解が存在する限り次の処理が繰り返されます。

(set! result (cons (func) result)) -> (cont #t) -> (*amb-fail*)

これで複数の解を result に格納することができます。func で解が見つからない場合、最初に *amb-fail* にセットした (lambda () (cont #f)) が実行されます。その結果、if 条件が偽と判定され、バックトラックを終了します。*amb-fail* を元の値に戻し、result を reverse! で反転して返します。

簡単な実行例を示しましょう。

gosh[r7rs.user]> (initialize-amb-fail)
#lt;closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (bag-of (lambda () (amb 1 2 3 4)))
(1 2 3 4)
gosh[r7rs.user]> (bag-of (lambda () (list (amb 1 2 3) (amb 4 5 6))))
((1 4) (1 5) (1 6) (2 4) (2 5) (2 6) (3 4) (3 5) (3 6))
gosh[r7rs.user]> (bag-of (lambda () (perm 3 '(a b c))))
((a b c) (a c b) (b a c) (b c a) (c a b) (c b a))

このように bag-of を使って全ての解を求めることができます。

●論理パズル

それでは簡単な例題として論理パズルを解いてみましょう。

[問題]

3人の友達が、あるプログラミング競技会で1位、2位、3位になった。この3人は、名前も、好きなスポーツも、国籍も異なる。Michael はバスケットが好きで、アメリカ人よりも上位であった。イスラエル人の Simon はテニスをする者よりも上位であった。クリケットをするものが1位であった。誰がオーストラリア人か? Richard はどのようなスポーツをするか?

簡単な論理パズルなので、プログラムを作る前に考えてみてください。

最初にデータ構造とアクセス関数を定義します。データはリストで表します。

(名前 順位 国籍 スポーツ)

このデータを amb で作成します。次のリストを見てください。

リスト : データとアクセス関数の定義

;;; データの生成
(define (make-data name)
  (list name 
        (amb 1 2 3)
        (amb 'US 'IL 'AU)
        (amb 'basket 'cricket 'tennis)))

;;; アクセス関数
(define (get-rank x) (list-ref x 1))
(define (get-nation x) (list-ref x 2))
(define (get-sports x) (list-ref x 3))

amb で順位 (1, 2, 3)、国籍 (US, IL, AU)、スポーツ (basket, cricket, tennis) の中から要素を一つ選びます。バックトラックすると異なる要素が選ばれて、新しいデータが生成されます。

次は問題を解くための補助関数を作ります。

リスト : 補助関数の定義

;;; 国籍が x の人を探す
(define (find-nation x . ls)
  (find-if (lambda (a) (eq? x (get-nation a))) ls))

;;; スポーツ x が好きな人を探す
(define (find-sports x . ls)
  (find-if (lambda (a) (eq? x (get-sports a))) ls))

;;; 要素が異なっているか
(define (check? . ls)
  (duplicates? (lambda (x y) (any eqv? (cdr x) (cdr y))) ls))

find-nation はリスト ls の中から国籍が x の要素を返します。find-sports は好きなスポーツが x の要素を返します。関数 find-if は第 1 引数の述語を満たす要素を線形探索します。duplicates? は重複要素があるかチェックします。どちらの関数も拙作のライブラリ (mylib list) に定義されています。

check? は duplicates? を呼び出して、重複した要素が有れば #t を返します。ラムダ式の引数 x, y にはデータ (名前 順位 国籍 スポーツ) が渡されます。名前以外で等しい要素があれば #t を返します。この処理は関数 any を使うと簡単です。any も (mylib list) に定義されています。

論理パズルの解法プログラムは次のようになります。

リスト : 論理パズルの解法

(define (puzzle)
  (let ((m (make-data 'Michael))
        (s (make-data 'Simon))
        (r (make-data 'Richard)))
    (assert (not (check? m s r)))
    (assert (eq? (get-sports m) 'basket))
    (assert (not (eq? (get-nation m) 'US)))
    (assert (eq? (get-nation s) 'IL))
    (assert (< (get-rank m) (get-rank (find-nation 'US m s r))))
    (assert (< (get-rank s) (get-rank (find-sports 'tennis m s r))))
    (assert (= (get-rank (find-sports 'cricket m s r)) 1))
    (list m s r)))

最初に make-data でデータを作成し、局所変数 m, s, r にセットします。そして、check? で順位、国籍、スポーツで要素が重複していないかチェックします。あとは問題の条件を assert でチェックしていくだけです。

  1. Michael の好きなスポーツはバスケットである。
  2. Michael の国籍はアメリカではない。
  3. Simon の国籍はイスラエルである。
  4. Michael は国籍がアメリカの人よりも上位である。
  5. Simon はテニスが好きな人よりも上位である。
  6. クリケットが好きな人が1位である。

条件を満たさない場合はバックトラックして新しいデータを生成します。最後に、見つけた解を出力します。とても簡単ですね。実行結果は次のようになります。

gosh[r7rs.user]> (initialize-amb-fail)
#<closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (puzzle)
((Michael 2 AU basket) (Simon 1 IL cricket) (Richard 3 US tennis))
gosh[r7rs.user]> (amb)
*** ERROR: amb tree exhausted

解は 1 通りで、1位が Simon, 2位が Michael, 3位が Richard になります。ちなみに、最後の条件がない場合は 2 通りの解が出力されます。興味のある方は試してみてください。

●マクロ版 amb の作成

それではマクロ版 amb を作りましょう。次のリストを見てください。

リスト : 非決定性 amb (マクロ版)

;;; バックトラックするときの継続を格納する
(define *amb-fail* #f)

;;; 初期化
(define (initialize-amb-fail)
  (set! *amb-fail*
        (lambda () (error "amb tree exhausted"))))

;;; 非決定性 amb (マクロ版)
(define-syntax amb
  (syntax-rules ()
    ((_) (*amb-fail*))
    ((_ a) a)
    ((_ a ...)
     (let ((prev-fail *amb-fail*))
       (call/cc
        (lambda (cont-s)
          (call/cc
           (lambda (cont-f)
             (set! *amb-fail*
                   (lambda ()
                     (set! *amb-fail* prev-fail)
                     (cont-f #f)))
             (cont-s a)))
          ...
          (prev-fail)))))))

引数がない場合は *amb-fail* に格納されている継続を実行します。引数が一つの場合はその評価結果を返します。ここまでは簡単ですね。引数が複数ある場合、関数版では for-each を使って実現しましたが、マクロ版では引数の数だけマクロ展開することにします。

引数 a は S 式 (call/cc (lambda (cont-f) ... (cont-s a))) でマクロ展開されます。このあとで省略子 ... を指定すると、残りの引数にもこの S 式が適用されてマクロ展開されます。たとえば、引数が 3 つある場合、S 式 (call/cc (lambda (cont-f) ... (cont-s x))) が 3 つマクロ展開され、(call/cc ...) (call/cc ...) (call/cc ...) となるわけです。ここで (cont-s x) の x は与えられた引数になります。

したがって、最初の引数を返したあとの継続は 2 番目の (call/cc ...) になり、2 番目の引数を返したあとの継続は 3 番目の (call/cc ...) になります。そして、最後の引数を返したあとの継続が (prev-fail) になります。この継続を実行すると以前に実行した amb の処理にバックトラックします。

それでは簡単な実行例を示しましょう。

gosh[r7rs.user]> (initialize-amb-fail)
#<closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (amb 1 2 3)
1
gosh[r7rs.user]> (amb)
2
gosh[r7rs.user]> (amb)
3
gosh[r7rs.user]> (amb)
*** ERROR: amb tree exhausted

gosh[r7rs.user]> (list (amb 'a 'b) (amb 'c 'd))
(a c)
gosh[r7rs.user]> (amb)
(a d)
gosh[r7rs.user]> (amb)
(b c)
gosh[r7rs.user]> (amb)
(b d)
gosh[r7rs.user]> (amb)
*** ERROR: amb tree exhausted

gosh[r7rs.user]> (amb 1 (amb) 2)
1
gosh[r7rs.user]> (amb)
2
gosh[r7rs.user]> (amb)
*** ERROR: amb tree exhausted

amb はマクロなので (amb (amb) 1) も正常に動作します。

ところで、bag-of もマクロにすると便利です。プログラムは次のようになります。

リスト : マクロ版 bag-of

;;; マクロ版 bag-of
(define-syntax bag-of
  (syntax-rules ()
    ((_ e)
     (let ((prev-fail *amb-fail*)
           (results '()))
       (if (call/cc
            (lambda (cont)
              (set! *amb-fail* (lambda () (cont #f)))
              (set! results (cons e results))
              (cont #t)))
           (*amb-fail*))
       (set! *amb-fail* prev-fail)
       (reverse! results)))))

関数 bag-of をマクロ定義しただけなので、とくに難しいところはないと思います。簡単な実行例を示します。

gosh[r7rs.user]> (initialize-amb-fail)
#<closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (bag-of (list (amb 1 2 3) (amb 1 2 3)))
((1 1) (1 2) (1 3) (2 1) (2 2) (2 3) (3 1) (3 2) (3 3))

●地図の配色問題

最後に地図の配色問題を解いてみましょう。今回は、下図に示す簡単な地図を 4 色で塗り分けてみます。

なお、地図は下記文献 (276頁) より引用しました。

プログラムは次のようになります。

リスト : 地図の配色問題

(define (color-map)
  (define regions '(a b c d e f))
  (define adjacent '((a b c d) (b a c e) (c a b d e f)
                     (d a c f) (e b c f) (f c d e)))
  (define (get-color p ls) (cdr (assoc p ls)))
  (define (same-color? region ls)
    (let ((color (get-color region ls)))
      (find-if (lambda (x) (eq? (get-color x ls) color))
               (cdr (assoc region adjacent)))))
  ;;
  (let ((m (map (lambda (x) (cons x (amb 'blue 'green 'red 'yellow))) regions)))
    (for-each
      (lambda (x)
        (assert (not (same-color? x m))))
      regions)
    m))

地域と色の対応は連想リストで表します。そして、map で連想リストを作るときに、amb で色を選んでセットするところがポイントです。そして、隣り合った地域で同じ色が使われていないか関数 same-color? でチェックします。同じ色が使われていたら、バックトラックして異なる色を選び直します。最後に連想リストを返します。

実行結果を示します。

gosh[r7rs.user]> (initialize-amb-fail)
#<closure ((initialize-amb-fail initialize-amb-fail))>
gosh[r7rs.user]> (color-map)
((a . blue) (b . green) (c . red) (d . green) (e . blue) (f . yellow))

このように amb を使うと問題を簡単に解くことができますが、生成されるデータ数が多くなると実行時間が極端に遅くなります。ようするに「生成検定法」と同じなので、なるべく無駄なデータを生成しないように工夫する必要があります。ご注意くださいませ。


初版 2009 年 7 月 11 日
改訂 2020 年 10 月 11 日

Copyright (C) 2009-2020 Makoto Hiroi
All rights reserved.

[ PrevPage | Scheme | NextPage ]