M.Hiroi's Home Page

Clojure Programming

お気楽 Clojure プログラミング超入門


Copyright (C) 2025 Makoto Hiroi
All rights reserved.

並行プログラミング (2)

core.async は Rich Hickey 氏が開発している非同期プログラミングのためのライブラリです。今回は core.async を使って非同期プログラミングに挑戦してみましょう。

●core.async のインストール

core.async は次に示すページで公開されています。

インストール方法は上記ページの README に書かれています。M.Hiroi は設定ファイル deps.edn の :deps に core.async を追加しました。

リスト : deps.edn

{
 :paths ["."]
 :deps { org.clojure/core.async {:mvn/version "1.8.741"} }
}

deps.edn はカレントディレクトリにあるものとします。deps.edn を修正したあと最初に clj を起動すると、async.core と必要なライブラリがインストールされます。

$ clj
Downloading: org/clojure/core.async/1.8.741/core.async-1.8.741.pom from central
Downloading: org/clojure/tools.analyzer.jvm/1.3.2/tools.analyzer.jvm-1.3.2.pom from central
Downloading: org/clojure/tools.reader/1.5.0/tools.reader-1.5.0.pom from central
Downloading: org/ow2/asm/asm/9.2/asm-9.2.pom from central
Downloading: org/ow2/ow2/1.5/ow2-1.5.pom from central
Downloading: org/clojure/tools.analyzer/1.2.0/tools.analyzer-1.2.0.pom from central
Downloading: org/clojure/core.memoize/1.1.266/core.memoize-1.1.266.pom from central
Downloading: org/clojure/core.cache/1.1.234/core.cache-1.1.234.pom from central
Downloading: org/clojure/data.priority-map/1.2.0/data.priority-map-1.2.0.pom from central
Downloading: org/clojure/tools.analyzer/1.2.0/tools.analyzer-1.2.0.jar from central
Downloading: org/clojure/tools.analyzer.jvm/1.3.2/tools.analyzer.jvm-1.3.2.jar from central
Downloading: org/clojure/tools.reader/1.5.0/tools.reader-1.5.0.jar from central
Downloading: org/ow2/asm/asm/9.2/asm-9.2.jar from central
Downloading: org/clojure/core.memoize/1.1.266/core.memoize-1.1.266.jar from central
Downloading: org/clojure/data.priority-map/1.2.0/data.priority-map-1.2.0.jar from central
Downloading: org/clojure/core.async/1.8.741/core.async-1.8.741.jar from central
Downloading: org/clojure/core.cache/1.1.234/core.cache-1.1.234.jar from central
Clojure 1.12.0
user=> 

ライブラリ async.core は require でロードします。

(require '[clojure.core.async :refer [...] :as ...)

ns マクロの :require を使ってもかまいません。

●core.async の特徴

core.async は Go 言語の「チャネル (channel)」に似た機構を用いて、並行処理をシンプルに記述できるのが特徴です。core.async には Go 言語の goroutine のような「軽量プロセス」を実現する go ブロックが用意されていて、チャネルを使って go ブロック間でデータの送受信を行うことができます。チャネルへのデータの送受信は <! や >! を使って記述します。これにより、コルーチンのように、非同期処理を同期的なコードのように記述することができます。

簡単な例を示しましょう。

user=> (require '[clojure.core.async :as a])
nil

user=> (def c (a/chan))
#'user/c
user=> (a/go-loop [] (println (a/<! c)) (recur))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x539953af 
"clojure.core.async.impl.channels.ManyToManyChannel@539953af"]

user=> (a/go (dotimes [x 10] (a/>! c x)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0xa23b96b 
"clojure.core.async.impl.channels.ManyToManyChannel@a23b96b"]
0
1
2
3
4
5
6
7
8
9

user=>

チャネルは関数 chan で生成します。(go-loop ...) は (go (loop ...)) と同じです。go はマクロで、go ブロック本体を非同期で実行します。Go 言語の goroutine のように、go ブロックは軽量で高速に動作すると言われています。go は本体の処理結果を受け取るチャネルを直ぐに返します。上の例では、チャネル c から <! でデータを受け取り、それを println で表示するだけです。

次の go ブロックで、0 から 9 までの数値をチャネル c に送信します。データの送信は >! を使います。すると、最初に起動した go ブロックがデータを受信して、それを println で表示します。なお、<! と >! は go ブロックの中でしか使用することはできません。ご注意くださいませ。

●チャネル

async.core のチャネルはデータを送受信するために使用する「キュー (queue)」のことで、簡単に言えば他の go ブロックやスレッドとの通信路になります。チャネルは core.async で用意されているデータ型の一つで、関数の引数に渡したり、関数の中でチャネルを生成して、それを返すこともできます。

チャネルは関数 chan で生成します。

chan
chan size

引数なしの chan はバッファなしのチャネルを生成します。この場合、バッファリングは行われません。大きさ size を指定すると、バッファサイズが size のチャネルを生成します。

バッファリングを行わない場合、通信を行う go ブロックの実行は「同期」されます。つまり、データを書き込む側と読む出す側が揃うまで、どちらか一方の実行は休止します。バッファリングを行う場合、チャネルのバッファはキューとして動作します。この場合、通信を行う go ブロックの実行は「非同期」になります。

たとえば、バッファにデータがある場合、読み出す側はデータを読み込んでプログラムの実行を続けます。読み出す側が休止するのはバッファにデータがないときだけです。逆に、書き込む側はバッファに空きがあれば、データを書き込んでプログラムの実行を続けます。バッファが満杯のときだけ休止します。

この他に、chan には以下の関数の返り値を指定することができます。

上記の関数を使うと、バッファが満杯時の動作を変更することができます。

●データの送受信

チャネルは nil 以外の任意のデータを送受信することができます。また、チャネルは go ブロックだけではなく、スレッドでも使用することができます。core.async には future と同じような機能を持つ thread が用意されているので、スレッドでも簡単にチャネルを使うことができます。

go ブロックでデータの送受信を行うには <! と >! を使います。thread で使用することはできません。

<! ch
>! ch data

<! はチャネルにデータが無いとき、>! はチャネルが満杯のとき、go ブロックの実行を休止します。core.async のリファレンスには「パーキング (parking)」と記載されています。

go ブロック以外でデータの送受信を行うには <!! と >!! を使います。

<!! ch
>!! ch data

<!! はチャネルにデータが無いとき、>!! はチャネルが満杯のとき、プログラムの実行を休止します。core.async のリファレンスには「ブロッキング (blocking)」と記載されています。

チャネルは関数 close! でクローズすることができます。

close! ch

チャネルのクローズすると、データを送信することができなくなります。クローズされたチャネルからデータを受信すると、チャネルにまだデータが残っていれば、それを受信することはできます。データが無い場合は直ぐに nil を返します。

簡単な実行例を示します。

$ clj
Clojure 1.12.0
user=> (require '[clojure.core.async :as a])
nil
user=> (def ch (a/chan 4))
#'user/ch

user=> (a/>!! ch 1)
true
user=> (a/close! ch)
nil
user=> (a/>!! ch 2)
false

user=> (a/<!! ch)
1
user=> (a/<!! ch)
nil

●go と thread

go は特別なスレッドプール内で本体を非同期的に実行します。返り値は go ブロックの結果を受け取るチャネルです。thread は future と同じく本体を別スレッドで実行します。返り値は thread 本体の結果を受け取るチャネルです。

(go & body) => chan
(thread & body) => chan

go と thread はマクロです。go や thread は並行に動作するので、本体の実行終了を待つことなく、そのあとに記述されているプログラムを実行します。簡単な例を示しましょう。0.5 秒間隔で name を n 回表示するプログラムを作ります。

リスト : 0.5 秒間隔で name を n 回表示する (go01.clj)

(ns go01
  (:require [clojure.core.async :refer [go timeout <!! <!]]))

;; wait 間隔で名前と回数を表示
(defn display-name [name n wait]
  (go (dotimes [i n]
        (printf "%s %d\n" name i)
        (flush)
        (<! (timeout wait)))
      (str name " done")))

(defn -main []
  (let [c1 (display-name "foo" 5 500)
        c2 (display-name "bar" 7 500)]
    (println (<!! c1))
    (println (<!! c2))))

関数 display-name は処理本体を go マクロで囲んでいるので、本体は他のプログラムと平行に動作します。dotimes で name を n 回表示したあと、文字列 name + " done" を返します。関数 timeout は、呼び出してから wait msec 経過したあと、自動的にクローズされるチャネルを返します。<! でチャネルからデータを受信すると、その間だけ go ブロックの処理がパークされることになります。

関数 -main では、display-name を呼び出して、返り値のチャネルを変数 c1, c2 に受け取ります。c1 と c2 から <!! でデータを受信して println で表示します。<!! は処理をブロックするので、go ブロックの処理が終了するまでメインプログラムの実行は休止することになります。

それでは実行してみましょう。

$ clj -M -m go01
foo 0
bar 0
foo 1
bar 1
bar 2
foo 2
bar 3
foo 3
foo 4
bar 4
bar 5
foo done
bar 6
bar done

2 つの go ブロックが平行に動作していることがわかります。ご参考までに thread を使ったプログラムと実行結果を示します。

リスト : 0.5 秒間隔で Name を N 回表示する (async01.clj)

(ns async01
  (:require [clojure.core.async :refer [thread timeout <!!]]))

;; wait 間隔で名前と回数を表示
(defn display-name [name n wait]
  (thread
    (dotimes [i n]
      (printf "%s %d\n" name i)
      (flush)
      (<!! (timeout wait)))
    (str name " done")))

(defn -main []
  (let [c1 (display-name "foo" 5 500)
        c2 (display-name "bar" 7 500)]
    (println (<!! c1))
    (println (<!! c2))))
$ clj -M -m async01
foo 0
bar 0
foo 1
bar 1
foo 2
bar 2
foo 3
bar 3
foo 4
bar 4
bar 5
foo done
bar 6
bar done

●go ブロックの同期

チャンネルを使って同期をとることで、複数の go ブロックを協調的に動作させることができます。たとえば、1 文字を表示する go ブロックを複数個作成し、"hey! " とういう文字列を複数回画面に表示するプログラムを作ってみましょう。次のリストを見てください。

リスト : go ブロックの同期 (go02.clj)

(ns go02
  (:require [clojure.core.async :refer [chan go-loop timeout <! >! <!! >!!]]))

(defn make-routine [code ch]
  (let [out (chan)]
    (go-loop []
      (<! ch)
      (print code)
      (flush)
      (<! (timeout 50))
      (>! out 0)
      (recur))
    out))

(defn -main []
  (let [ch1 (chan)
        ch2 (make-routine "h" ch1)
        ch3 (make-routine "e" ch2)
        ch4 (make-routine "y" ch3)
        ch5 (make-routine "!" ch4)
        ch6 (make-routine " " ch5)]
    (dotimes [_ 10]
      (>!! ch1 0)
      (<!! ch6))))
$ clj -M -m go02
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey!

make-routine は引数の文字 code を表示する go ブロックを起動します。最初に、文字を表示したことを通知するチャネル out を生成し、go ブロックを起動してからチャネル out を返します。ブロック本体は無限ループになっていて、チャネル ch からデータを受信するのを待ちます。

次に、code を表示してから 50 msec 後に out へデータを送信します。たとえば、go ブロック 1 の送信チャネルを go ブロック 2 の受信チャネルに設定すると、go ブロック 1 で文字が表示されたあと、go ブロック 2 で文字が表示されることになります。

-main 関数では、最初にチャネル ch1 を生成し、それを make-routine に渡して返り値を変数 ch2 にセットします。これで ch1 にデータを送信すると、"h" が表示されて ch2 にデータが送信されます。次に、ch2 を make-routine に渡して、返り値を ch3 にセットします。ch2 にデータを送信すると、"e" が表示されて ch3 にデータが送信されます。あとは、表示する文字だけ make-routine で go ブロックを生成します。

最後に、dotimes で ch1 にデータを送信して、画面に文字列を表示します。"hey! " が表示されたあと、ch6 にデータが送信されるので、それを受信するのを待ちます。あとは、これを 10 回繰り返すだけです。

なお、thread でも同じことができます。ご参考までに、thread で書き直したプログラムを示します。

リスト : thread の同期 (async02.clj)

(ns async02
  (:require [clojure.core.async :refer [chan thread timeout <!! >!!]]))

(defn make-routine [code ch]
  (let [out (chan)]
    (thread
      (while true
        (<!! ch)
        (print code)
        (flush)
        (<!! (timeout 50))
        (>!! out 0)))
    out))

(defn -main []
  (let [ch1 (chan)
        ch2 (make-routine "h" ch1)
        ch3 (make-routine "e" ch2)
        ch4 (make-routine "y" ch3)
        ch5 (make-routine "!" ch4)
        ch6 (make-routine " " ch5)]
    (dotimes [_ 10]
      (>!! ch1 0)
      (<!! ch6))))
$ clj -M -m async02
hey! hey! hey! hey! hey! hey! hey! hey! hey! hey!

●データの交換

もうひとつ、簡単な例を示します。今度はお互いにデータを交換する処理を考えてみましょう。次のリストを見てください。

リスト : データの送受信 (go03.clj)

(ns go03
  (:require [clojure.core.async :refer [chan go go-loop timeout <! >! <!! >!!]]))

;; リクエスト
(defrecord Req [color replay])

;; color の送信
(defn send-color [n color ch]
  (go
    (let [in-ch (chan)
          v (->Req color in-ch)]
      (dotimes [_ n]
        (>! ch v)
        (<! in-ch)
        (<! (timeout 100)))
      (>! ch 0))))

;; color の受信
(defn receive-color [n ch]
  (go-loop [n n]
    (if (zero? n)
      :done
      (let [req (<! ch)]
        (if (= req 0)
          (recur (dec n))
          (do
            (println (:color req))
            (>! (:replay req) 0)
            (recur n)))))))

(defn -main []
  (let [ch (chan 8)]
    (send-color 8 "red" ch)
    (send-color 7 "blue" ch)
    (send-color 6 "green" ch)
    (println (<!! (receive-color 3 ch)))))

関数 send-color はチャネル ch に色データ color を n 回送信します。送信したあと、送信先からのメッセージを受信するまで待ちます。このように返信が必要な場合、メッセージを送信するときに受信用のチャネルもいっしょに送ります。このため、色データと受信用のチャネルを格納するレコード Req を用意します。Req のフィールド変数 Reply は、受け取った側が書き込むチャネルになります。

send-color は最初に受信用のチャネルを chan で生成し、送信するデータを ->Req で生成して変数 v にセットします。次の dotimes で、v を ch へ送信したあと、in からデータがくるまで待ちます。これを n 回繰り返したあと、最後に 0 を送信します。

receive-color の引数 n は起動した go ブロックの個数、ch は go ブロックからのデータを受け取るチャネルです。受信データ req が 0 ならば、go ブロックが一つ終了したので n の値を -1 します。そうでなければ、色データを画面に出力して、チャネル req.Reply へ 0 を送信します。これで相手方に返信することができます。

それでは実行してみましょう。

$ clj -M -m go03
red
blue
green
green
blue
red
green
blue
red
green
blue
red
green
blue
red
blue
red
green
blue
red
red
:done

正常に動作していますね。

当然ですが、thread でも同じことができます。ご参考までに、thread で書き直したプログラムを示します。

リスト : データの送受信 (async03.clj)

(ns async03
  (:require [clojure.core.async :refer [chan thread timeout <!! >!!]]))

;; リクエスト
(defrecord Req [color replay])

;; color の送信
(defn send-color [n color ch]
  (thread
    (let [in-ch (chan)
          v (->Req color in-ch)]
      (dotimes [_ n]
        (>!! ch v)
        (<!! in-ch)
        (<!! (timeout 100)))
      (>!! ch 0))))

;; color の受信
(defn receive-color [n ch]
  (thread
    (loop [n n]
      (if (zero? n)
        :done
        (let [req (<!! ch)]
          (if (= req 0)
            (recur (dec n))
            (do
              (println (:color req))
              (>!! (:replay req) 0)
              (recur n))))))))

(defn -main []
  (let [ch (chan 8)]
    (send-color 8 "red" ch)
    (send-color 7 "blue" ch)
    (send-color 6 "green" ch)
    (println (<!! (receive-color 3 ch)))))
$ clj -M -m async03
red
blue
green
blue
red
green
blue
red
green
blue
red
green
blue
red
green
blue
red
green
blue
red
red
:done

●alts! と alts!!

複数のチャネルを扱う場合、alts! や alts!! を使うとデータの送受信が可能なチャネルを選んで処理することができます。

(alts! [c1 c2 ...] & options)                  => [data ch]
(alts! [[c1 data1] [ch2 data2] ...] & options) => [data ch]

第 1 引数のベクタに処理するチャネルを指定します。チャネルだけだと受信、ベクタ [チャネル データ] だと指定したチャネルにデータを送信します。送受信は混在してもかまいません。返り値はベクタ [data ch] で、ch が操作したチャネルです。受信時には data に受信したデータがセットされます。チャネルがクローズされているか timeout チャネルの場合は nil になります。送信の場合、成功したときは true になり、失敗したときには false になります。

オプションに :default data を指定すると、操作可能なチャネルが見つからないとき [data :default] を返します。同時に複数のチャネルが処理可能な場合、処理するチャネルはランダムに選択されます。オプション :priority に true を指定すると、チャネルの優先順位は第 1 引数のベクタで並べた順番になります。

簡単な例を示しましょう。

リスト : alts! の使用例 (go04.clj)

(ns go04
  (:require [clojure.core.async :refer [alts! chan go go-loop timeout <! >! <!! >!!]]))

(defn foo [n ch quit]
  (go-loop [n n]
    (if (zero? n)
      (>! quit 0)
      (do
        (>! ch n)
        (<! (timeout 500))
        (recur (dec n))))))

(defn bar [n ch quit]
  (go-loop [n n]
    (if (zero? n)
      (>! quit 0)
      (do
        (>! ch (/ n 10.0))
        (<! (timeout 250))
        (recur (dec n))))))

(defn baz [n ch quit]
  (go-loop [n n]
    (if (zero? n)
      (>! quit 0)
      (do
        (>! ch (str (* n 10)))
        (<! (timeout 750))
        (recur (dec n))))))

(defn receive [ch1 ch2 ch3 quit]
  (go-loop [n 3]
    (if (zero? n)
      :done
      (let [[r c] (alts! [ch1 ch2 ch3 quit] :default "None")]
        (cond
          (= c quit)
          (recur (dec n))
          (= c :default)
          (do (println r)
              (<! (timeout 250))
              (recur n))
          :else
          (do (println r)
              (recur n)))))))

(defn -main []
  (let [ch1  (chan)
        ch2  (chan)
        ch3  (chan)
        quit (chan)]
    (foo 6 ch1 quit)
    (bar 8 ch2 quit)
    (baz 4 ch3 quit)
    (println (<!! (receive ch1 ch2 ch3 quit)))))

関数 foo は int を、bar は double を、baz は string を送信します。そして、どの関数も終了を通知するチャネル quit にデータを送信します。関数 receive は go-loop で本体を繰り返し評価し、alts! でチャネルを監視します。

どのチャネルも受信できない場合は [:default "None"] が返されます。ここで None を画面に表示して 250 msec だけ待機します。ch1, ch2, ch3 からデータを受信した場合は、そのデータを画面へ出力します。quit からデータを受信した場合は go ブロックが一つ終了したので n の値を -1 します。

それでは実行してみましょう。

$ clj -M -m go04
40
6
0.8
None
0.7
None
5
0.6
None
30
0.5
None
0.4
4
None
0.3
None
20
0.2
3
None
0.1
None
2
None
10
None
1
None
None
:done

正常に動作していますね。もちろん、thread を使ってプログラムを作ることもできます。この場合、alts! の代わりに alts!! を使います。興味のある方はプログラムを作ってみてください。

●タイムアウトの処理

alts! と timeout を組み合わせると、待ち時間を設定することができます。簡単な例を示しましょう。

$ clj
Clojure 1.12.0
user=> (require '[clojure.core.async :as a])
nil
user=> (defn fibo [n] (if (< n 2) n (+ (fibo (- n 2)) (fibo (dec n)))))
#'user/fibo

user=> (time (fibo 38))
"Elapsed time: 539.266896 msecs"
39088169
user=> (time (fibo 40))
"Elapsed time: 1436.374408 msecs"
102334155

関数 fibo はフィボナッチ数列を計算します。この関数は二重再帰になっているので、実行時間はとても遅いです。この関数で 38 と 40 の値を求めますが、40 は 1 秒以内に計算することができません。タイムアウトを設定する場合、次のように alts!! と timeout を使います。

user=> (a/alts!! [(a/thread (fibo 38)) (a/timeout 1000)])
[39088169 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x85eabca 
"clojure.core.async.impl.channels.ManyToManyChannel@85eabca"]]

user=> (a/alts!! [(a/thread (fibo 40)) (a/timeout 1000)])
[nil #object[clojure.core.async.impl.channels.ManyToManyChannel 0x44a972b4 
"clojure.core.async.impl.channels.ManyToManyChannel@44a972b4"]]

(fibo 38) は 1 秒以内に計算することができます。返り値は (fibo 38) の計算結果になります。(fibo 40) は 1 秒以内に計算が終わりませんが、先に timeout チャネルが nil を返すので、タイムアウトしたことがわかります。

●数列の生成

遅延シーケンスのように、go ブロックとチャネルを使って数列を生成することもできます。次のリストを見てください。

リスト : 数列の生成 (go05.clj)

(require '[clojure.core.async :refer [chan close! go-loop ! !!] :as a])

;; 整数列
(defn make-ints [n m]
  (let [ch (chan)]
    (go-loop [n n]
      (if (>= n m)
        (close! ch)
        (do (>! ch n)
            (recur (inc n)))))
    ch))

;; フィボナッチ数列
(defn make-fibo []
  (let [ch (chan)]
    (go-loop [a 0N b 1N]
      (>! ch a)
      (recur b (+ a b)))
    ch))

関数 make-ints は n 以上 m 未満の整数列を生成します。最初に、整数列を出力するチャネル ch を chan で作成します。次に、go-loop で変数 n の値を n から m まで増やしていき、チャネル ch に n を書き込みます。繰り返しが終了したら close! でチャネルをクローズします。

関数 make-fibo はフィボナッチ数列を出力するチャネルを返します。go-loop は無限ループになっていることに注意してください。BigInt を使っているので、メモリが許す限り大きなフィボナッチ数を計算することができます。このように、数列を生成するプログラムは go ブロックとチャネルを使って簡単にプログラムすることができます。

簡単な実行例を示します。

user=> (load-file "go05.clj")
#'user/make-fibo

user=> (def a (make-ints 1 5))
#'user/a
user=> (<!! a)
1
user=> (<!! a)
2
user=> (<!! a)
3
user=> (<!! a)
4
user=> (<!! a)
nil

user=> (def b (make-fibo))
#'user/b
user=> (<!! b)
0N
user=> (<!! b)
1N
user=> (<!! b)
1N
user=> (<!! b)
2N
user=> (<!! b)
3N
user=> (<!! b)
5N
user=> (<!! b)
8N
user=> (<!! b)
13N

チャネルから受信したデータをコレクションに格納する場合、関数 into を使うと便利です。

(into coll ch) => chan

into は引数 ch のチャネルからデータを受信して、それを引数 coll のコレクションに追加します。返り値は結果を返すチャネルです。簡単な例を示します。

user=> (<!! (a/into [] (make-ints 0 4)))
[0 1 2 3]
user=> (<!! (a/into [0 1 2 3 4 5] (make-ints 6 10)))
[0 1 2 3 4 5 6 7 8 9]

user=> (<!! (a/into '() (make-ints 0 5)))
(4 3 2 1 0)
user=> (<!! (a/into '(9 8 7 6) (make-ints 0 5)))
(4 3 2 1 0 9 8 7 6)

関数 make-fibo が返すチャネルは無限数列になるので、このまま into を適用することはできません。この場合は take を使います。

(take n ch)              => chan
(take n ch buff-or-size) => chan

take は引数のチャネル ch から n 個の要素を取り出し、それを出力するチャネル chan を返します。ch がクローズされる、または n 個の要素を出力すると、chan はクローズされます。第 3 引数の buff-or-size は関数 chan の第 1 引数と同じで、バッファのサイズやバッファリングの動作を変更することができます。

簡単な例を示しましょう。

user=> (def f (a/take 40 (make-fibo)))
#'user/f

user=> (<!! (a/into [] f))
[0N 1N 1N 2N 3N 5N 8N 13N 21N 34N 55N 89N 144N 233N 377N 610N 987N 1597N 2584N
 4181N 6765N 10946N 17711N 28657N 46368N 75025N 121393N 196418N 317811N 514229N
 832040N 1346269N 2178309N 3524578N 5702887N 9227465N 14930352N 24157817N
 39088169N 63245986N]

●チャネルの操作

to-chan! と to-chan!! はコレクションの要素を順番に取り出して、それをチャネルに送信します。

(to-chan!  coll) => chan
(to-chan!! coll) => chan

返り値は要素を受け取るチャネルです。要素が無くなるとチャネルはクローズされます。簡単な例を示しましょう。

user=> (require '[clojure.core.async :as a])
nil

user=> (def c (a/to-chan!! '(1 2 3 4 5)))
#'user/c
user=> (a/<!! c)
1
user=> (a/<!! c)
2
user=> (a/<!! c)
3
user=> (a/<!! c)
4
user=> (a/<!! c)
5
user=> (a/<!! c)
nil

user=> (def d (a/to-chan!! {:foo 10, :bar 20, :baz 30}))
#'user/d
user=> (a/<!! d)
[:foo 10]
user=> (a/<!! d)
[:bar 20]
user=> (a/<!! d)
[:baz 30]
user=> (a/<!! d)
nil

onto-chan! と onto-chan!! はコレクションの要素を順番に取り出して、それを引数のチャネルに送信します。

(onto-chan!  ch coll)
(onto-chan!  ch coll close?)
(onto-chan!! ch coll)
(onto-chan!! ch coll close?)

デフォルトでは、要素が無くなるとチャネルはクローズされますが、引数 close? に偽を指定すると動作を変更することができます。

user=> (def c (a/chan 8))
#'user/c
user=> (a/onto-chan!! c '(1 2 3 4))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x64b242b3 
"clojure.core.async.impl.channels.ManyToManyChannel@64b242b3"]
user=> (a/<!! c)
1
user=> (a/<!! c)
2
user=> (a/<!! c)
3
user=> (a/<!! c)
4
user=> (a/<!! c)
nil

user=> (def d (a/chan 8))
#'user/d
user=> (a/onto-chan!! d '(1 2 3 4) false)
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x75e068dc 
"clojure.core.async.impl.channels.ManyToManyChannel@75e068dc"]
user=> (a/<!! d)
1
user=> (a/<!! d)
2
user=> (a/<!! d)
3
user=> (a/<!! d)
4
user=> (a/<!! d)  ;; ブロッキングされる

チャネル用の map 関数も用意されています。

(map func chs)              => chan
(map func chs buff-or-size) => chan

map は引数のチャネルのコレクション chs からデータを読み込み、それを func に適用した結果をチャネルに書き込みます。返り値は結果を出力するチャネルです。デフォルトではバッファリングなしのチャネルを生成し、そこに結果を書き込みます。第 3 引数 buff-or-size は関数 chan の第 1 引数と同じで、バッファのサイズやバッファリングの動作を変更することができます。

簡単な例を示しましょう。

user=> (def x (a/map #(* % %) [(make-ints 0 10)]))
#'user/x
user=> (<!! x)
0
user=> (<!! x)
1
user=> (<!! x)
4
user=> (<!! x)
9
user=> (<!! x)
16
user=> (<!! x)
25
user=> (<!! x)
36
user=> (<!! x)
49
user=> (<!! x)
64
user=> (<!! x)
81
user=> (<!! x)
nil

この他にも core.async には便利な関数が用意されています。興味のある方は core.async のリファレンスをお読みくださいませ。


初版 2025 年 8 月 16 日