M.Hiroi's Home Page

Clojure Programming

お気楽 Clojure プログラミング超入門


Copyright (C) 2025 Makoto Hiroi
All rights reserved.

並行プログラミング

今回は Clojure で簡単な並行プログラミングに挑戦してみましょう。

●並行プログラミングとは?

「並行 (concurrent) プログラミング」は複数のプログラムを並行に実行しますが、このとき複数の CPU で同時に動かす場合と、ひとつの CPU で複数のプログラムを動かす場合があります。一般的には、前者を「並列 (parallel) プログラミング」といい、複数のハードウェアを並列に実行することによる処理速度の向上が主な目的となります。

後者の場合、一定時間毎に実行するプログラムを切り替えることで、複数のプログラムを並列に実行しているかのように見せることができます。この処理を「時分割 (time sharing)」もしくは「タイム・スライス (time slice)」といいます。一般に、タイム・スライスは OS でサポートされている機能です。OS が実行するプログラムのことを「プロセス (process)」または「タスク (task)」といいます。

並列的に動作するプログラムをひとつのプロセスだけで作るのはけっこう大変です。そこで、プロセス内では逐次的な処理にとどめ、複数のプロセス間で情報交換を行うことにより、全体で並列的な動作を実現することを考えます。このほうが自然にプログラムを記述できる場合があるのです。これが後者の主な目的となります。

プロセスは互いに独立したプログラムですが、OS にはプロセス間でデータをやり取りする機能 (プロセス間通信) が用意されています。たとえば、UNIX ライクな OS では「パイプ (pipe)」を使って複数のプログラム (コマンド) を連結することができます。この場合、パイプを通してデータがプログラムに送られ、各プログラムは並行に動作することになります。入出力処理で待たされるコマンドがあったとしても、その間に他のコマンドが実行されるので、各コマンドを逐次的に実行するよりも、効率的に処理することが可能です。

最近では、ひとつのプログラムの中で独立した複数の処理を記述できるようになりました。この機能を「スレッド (thread)」とか「マルチスレッド」いいます。スレッドは「縫い糸」という意味ですが、プログラムでは「制御の流れ」という意味で使われています。並列的な動作をプログラムする場合、逐次的な処理をひとつのスレッドに割り当て、複数のスレッドを並行に動作させることにより、全体で並列的な動作を実現するわけです。

一般に、スレッドは一定時間毎に実行するスレッドを強制的に切り替えます。このとき、スレッドのスケジューリングは処理系が行います。これを「プリエンプティブ (preemptive)」といいます。これに対し、Ruby のファイバーや Lua のコルーチンは、プログラムの実行を一定時間毎に切り替えるものではありません。他のプログラムが実行できるよう自主的に処理を中断する、といった協調的な動作を行わせることで、複数のプログラムを並行に動作させています。これを「ノンプリエンプティブ (nonpreemptive)」といいます。

スレッドは同じプロセス内に存在するので、メモリ空間を共有することができます。これを「共有メモリ」といいます。スレッド間の通信は共有メモリを使って簡単に行うことができますが、プリエンプティブなスレッドの場合、共有メモリのアクセス時に発生する「競合」が問題になります。このため、プリエンプティブなマルチスレッドをサポートしているプログラミング言語では、競合を回避するための仕組みが用意されています。

Clojure の並行プログラミングは Java のスレッドシステムに依存していています。クラス Thread を使ってスレッドを生成する方法もありますが、今では Java と同様の future や、Go 言語の goroutine とよく似た動作をするライブラリ core.async など、もっと便利な方法がサポートされています。

●クラス Thread によるスレッドの生成

最初にクラス Thread を使ってスレッドを生成してみましょう。

(Thread. (fn [] ...))

スレッドはコンストラクタ Thread. で生成します。Thread. には引数のない無名関数を渡します。メソッド .start でスレッドに渡した関数を評価します。メソッド .join はスレッドが終了するまで待機します。簡単な例を示しましょう。

リスト : スレッドの生成 (thread01.clj)

(ns thread01)

(defn worker [n wait]
  (dotimes [_ 8]
    (printf "worker %d\n" n)
    (flush)
    (Thread/sleep wait)))

(defn -main []
  (let [t1 (Thread. #(worker 1 400))]
    (.start t1)
    (worker 0 300)
    (.join t1)))

関数 worker は番号 n を wait 間隔で 8 回表示します。flush は *out* のバッファの内容を吐き出す関数です。Thread/sleep wait は wait msec 秒だけスレッドを休止します。関数 -main では、Thread. で新しいスレッドを生成して変数 t1 にセットします。(.start t1) でスレッドの実行を開始して、(worker 0 300) を評価します。これで worker 0, 1 が平行に動作することになります。最後に (.join t1) でスレッド t1 が終了するのを待ちます。

それでは実行してみましょう。

$ clj -M -m thread01
worker 0
worker 1
worker 0
worker 1
worker 0
worker 1
worker 0
worker 1
worker 0
worker 0
worker 1
worker 0
worker 1
worker 0
worker 1
worker 1 

worker 0 と worker 1 が 8 回表示されました。スレッド t1 と -main 関数が平行に動作していることがわかります。

●スレッドプール

スレッドの生成や削除は重たい処理なので、事前に作成した複数のスレッドをプールしておいて、必要に応じてそこからスレッドを割り当てる方法があります。これを「スレッドプール」といいます。Clojure でスレッドプールを直接操作するには、Java のクラス Executors を使います。Java のクラスは import でロードします。

(import '(パッケージ.名前))
(import '(パッケージ 名前1 名前2 ...))

ns マクロの :import を使ってもかまいません。Executors はパッケージ java.util.concurrent にあります。スレッドプールを生成するメソッドが用意されているので、適当なものを選んでください。今回は newFixedThreadPool size を使うことにします。size はスレッドプールの容量です。返り値はインタフェース ExecutorService を実装したオブジェクトで、ここに定義されているメソッドを使ってスレッドを操作します。

簡単な例を示しましょう。

リスト : スレッドプール (thread02.clj)

(ns thread02
  [:import (java.util.concurrent Executors)])

(defn worker [n wait]
  (dotimes [_ 8]
    (printf "worker %d\n" n)
    (flush)
    (Thread/sleep wait)))

(defn -main []
  (let [pool  (Executors/newFixedThreadPool 3)
        tasks [#(worker 1 300) #(worker 2 700) #(worker 3 500)]]
   (.invokeAll pool tasks)
   (.shutdown pool)))

関数 -main では、newFixedThreadPool で生成したスレッドプールを変数 pool にセットし、スレッドで実行する関数を配列 tasks にセットします。メソッド .invokeAll は tasks の関数からスレッドを生成し、それをプールに登録して実行します。.ivokeAll は全てのスレッドが終了まで待機します。最後に、スレッドプールを .shutdown で終了します。これをしないとプログラムは終了しません。

それでは実行してみましょう。

$ clj -M -m thread02
worker 2
worker 3
worker 1
worker 1
worker 3
worker 1
worker 2
worker 1
worker 3
worker 1
worker 2
worker 3
worker 1
worker 1
worker 3
worker 2
worker 1
worker 3
worker 2
worker 3
worker 2
worker 3
worker 2
worker 2

worker 1, 2, 3 が並行に動作していることがわかります。

●future

Clojure では future を使って簡単にスレッドを生成することができます。

(future & body)

future は S 式 body を受け取り、future オブジェクトを生成して返します。future は別のスレッドで body を呼び出し、最後の評価結果を遅延評価 delay のようにキャッシュします。この値は deref または @ で取り出すことができます。スレッドが終了していない場合、deref や @ の呼び出しはブロックされます。なお、future は暗黙のうちにスレッドプールが使用されます。

簡単な例を示しましょう。

user=> (let [a (future (println "start") (Thread/sleep 1000) (println "end") :done)]
(println @a))
start
end
:done
nil

future の body は println で start を表示し、sleep で 1 秒間休止したあと end を表示して :done を返します。実際に実行すると、start が表示されて 1 秒後に end が表示されるまで、(println @a) はブロックされます。スレッドが終了すると、@a の値が求まるので、:done が表示されます。

複数のスレッドを生成するのも簡単です。次のリストを見てください。

リスト : future の使用例 (thread03.js)

(ns thread03)

(defn worker [n wait]
  (dotimes [_ 8]
    (printf "worker %d\n" n)
    (flush)
    (Thread/sleep wait))
  (str "worker " n " done"))

(defn -main []
  (let [a (future (worker 1 200))
        b (future (worker 2 600))
        c (future (worker 3 400))]
    (println @a)
    (println @b)
    (println @c)
    (shutdown-agents)))

future を使う場合、-main の最後で shutdown-agents を実行してください。そうしないとプログラムは終了しません。実行例を示します。

$ clj -M -m thread03
worker 1
worker 2
worker 3
worker 1
worker 3
worker 1
worker 2
worker 1
worker 3
worker 1
worker 1
worker 2
worker 3
worker 1
worker 1
worker 3
worker 1 done
worker 2
worker 3
worker 2
worker 3
worker 3
worker 2
worker 2
worker 2
worker 2 done
worker 3 done

上記のプログラムは pcalls を使うともっと簡単になります。

(pcalls & funcs)

pcalls は引数の funcs を並列に実行します。funcs は引数を受け取らない関数です。返り値は funcs の結果を格納した遅延シーケンスです。future のプログラムを pcalls で書き直すと次のようになります。

リスト : pcalls の使用例 (thread3a.clj)

(ns thread3a)

(defn worker [n wait]
  (dotimes [_ 8]
    (printf "worker %d\n" n)
    (flush)
    (Thread/sleep wait))
  (str "worker " n " done"))

(defn -main []
  (doseq [x (pcalls #(worker 1 200) #(worker 2 600) #(worker 3 400))]
    (println x))
  (shutdown-agents))
$ clj -M -m thread3a
worker 1
worker 2
worker 3
worker 1
worker 3
worker 1
worker 2
worker 1
worker 3
worker 1
worker 1
worker 2
worker 3
worker 1
worker 1
worker 3
worker 1 done
worker 2
worker 3
worker 2
worker 3
worker 3
worker 2
worker 2
worker 2
worker 2 done
worker 3 done

pcalls は future を使って実装されているそうです。最後に shutdown-agents を実行してください。

●pmap

pmap は map の並列版です。引数の関数を並列で実行します。pmap の返り値は関数の評価結果を格納した遅延シーケンスです。簡単な例を示しましょう。

user=> (defn fibo [n] (if (< n 2) n (+ (fibo (- n 1)) (fibo (- n 2)))))
#'user/fibo
user=> (time (fibo 38))
"Elapsed time: 608.920536 msecs"
39088169
user=> (time (fibo 39))
"Elapsed time: 927.08361 msecs"
63245986
user=> (time (fibo 40))
"Elapsed time: 1472.558635 msecs"
102334155

user=> (time (println (map fibo '(38 39))))
(39088169 63245986)
"Elapsed time: 1477.541587 msecs"
nil

user=> (time (println (pmap fibo '(38 39))))
(39088169 63245986)
"Elapsed time: 1068.974514 msecs"
nil

user=> (time (println (map fibo '(38 39 40))))
(39088169 63245986 102334155)
"Elapsed time: 2928.305299 msecs"
nil

user=> (time (println (pmap fibo '(38 39 40))))
(39088169 63245986 102334155)
"Elapsed time: 1855.03121 msecs"
nil

関数 fibo はフィボナッチ数列を求めます。fibo は二重再帰で定義しているので、実行時間はとても遅いです。map で 38, 39 のフィボナッチ数を求めると約 1.5 秒かかりますが、pmap を使うと 1.1 秒と速くなります。38, 39, 40 の場合は 2.9 秒かかるのが、pmap を使うと 1.9 秒まで速くなります。

M.Hiroi のパソコン (CPU, Intel Core i5-6200U 2.30GHz) は物理コア数が 2 で、1 コアにつきハイパースレッディングで 2 分割できるので、論理コア数は 4 となります。4 つのスレッドを並列に動かしても、4 倍速くなるわけではありませんが、それでも並列処理の効果は十分に出ていると思います。

この他に、複数の S 式を受け取り、それらを並列で実行する pvalues もあります。

(pvalues & s-expr)

返り値は S 式の評価結果を格納した遅延シーケンスです。簡単な実行例を示します。

user=> (time (println (pvalues (fibo 38) (fibo 39))))
(39088169 63245986)
"Elapsed time: 1174.63283 msecs"
nil

user=> (time (println (pvalues (fibo 38) (fibo 39) (fibo 40))))
(39088169 63245986 102334155)
"Elapsed time: 1974.013576 msecs"
nil

pmap と pvalues も、最後に shutdown-agents を実行してください。

●スレッドの排他制御

マルチスレッドで共有メモリなど資源の競合を回避することを「排他制御」といいます。昔から用いられている方法に「ロック (lock)」があります。これは Clojure (Java) でも使用することができますが、Clojure には「ソフトウェアトランザクショナルメモリ (Software Transactional Memory, STM)」という方法が用意されています。

一般に、「トランザクション (transaction)」は処理とか取引という意味ですが、SQL などのデータベースでは「関連した複数の処理を一つの処理にまとめたもの」をトランザクションといいます。M.Hiroi は勉強不足で STM のことをよく理解していません。説明は上記ページをお読みくださいませ。

Clojure の STM には Atom, Ref, Agent といった 3 つの操作があります。これを「アトミック操作 (atomic operation)」と呼ぶことがあります。アトミック操作は、複数のスレッドが同時に同じ変数を変更しようとしても、競合状態を回避するための仕組みです。

簡単な例を示しましょう。次のリストを見てください。

リスト : Atom の使用例

(ns thread04)

(defn -main []
  (let [x (atom 0)
        a (future (dotimes [_ 500] (swap! x inc)) :done-a)
        b (future (dotimes [_ 500] (swap! x inc)) :done-b)]
    (println @a)
    (println @b)
    (println @x)
    (shutdown-agents)))

変数 x を (atom 0) で初期化します。これで x を mutable な変数として使用することができます。次に、スレッドで x の値を 500 回インクリメントします。これをスレッド a, b で並列に実行します。スレッドは x の値をインクリメントするだけですが、同じ変数 x を +1 するので競合が発生すると思われるかもしれません。ところが、x は Atom により排他制御が行われるので、競合は発生せずに x を 500 回ずつ +1 する、合計で x の値は 1000 になります。

それでは実行してみましょう。

$ clj -M -m thread04
:done-a
:done-b
1000

Atom のアトミック操作は一つの変数が対象であり、複数の変数を同時に排他制御することはできません。次のリストを見てください。

リスト : Atom の間違った使用例 (thread05_bad.clj)

(ns thread05_bad)

(defn -main []
  (let [x (atom 0)
        y (atom 0)
        a (future (dotimes [_ 500]
                    (swap! x inc) (swap! y + @x))
                  :done-a)
        b (future (dotimes [_ 500]
                    (swap! x inc) (swap! y + @x))
                  :done-b)]
    (println @a)
    (println @b)
    (println @x)
    (println @y)
    (shutdown-agents)))

変数 y に変数 x の累積値を求めます。スレッド a, b で 500 回ずつ x をインクリメントすると、x は 1 から 1000 まで増加するので、累積値は 500500 になるはずです。実際に試してみましょう。

$ clj -M -m thread05_bad
:done-a
:done-b
1000
500503
$ clj -M -m thread05_bad
:done-a
:done-b
1000
500527
$ clj -M -m thread05_bad
:done-a
:done-b
1000
500510

y の値が 500500 になりませんね。これは競合が発生しているからです。x の値を更新した後、そこでアトミック操作がいったん途切れます。ここで他のスレッドが x の値をインクリメントすることが可能です。つまり、y の値に x を加算するとき、x の値はさらに +1 されている場合があるのです。

●Ref

このような場合は Ref 型を使います。Ref 型は関数 ref で生成します。

(ref init-value & options)

ref は初期値が init-value の Ref 型のデータを生成します。値の取得は Atom と同じで deref または @ を使います。値の更新は次の関数を使います。

alter は ref の値を (f @atom x y ...) の評価結果に書き換えます。ref-set は ref の値を newval に書き換えます。値の更新は dosync の中で行う必要があります。dosync の範囲がトランザクション (アトミック操作) になります。簡単な例を示しましょう。

user=> (def a (ref 0))
#'user/a
user=> a
#object[clojure.lang.Ref 0x4a67318f {:status :ready, :val 0}]
user=> @a
0

user=> (dosync (alter a inc))
1
user=> @a
1

user=> (dosync (alter a + 100))
101
user=> @a
101

user=> (dosync (ref-set a 1000))
1000
user=> @a
1000

user> (ref-set a 10000)
Execution error (IllegalStateException) at user/eval154 (REPL:1).
No transaction running

それでは、thread05_bad.clj を ref で書き直してみましょう。

リスト : ref の使用例

(ns thread05)

(defn -main []
  (let [x (ref 0)
        y (ref 0)
        a (future (dotimes [_ 500]
                    (dosync (alter x inc) (alter y + @x)))
                  :done-a)
        b (future (dotimes [_ 500]
                    (dosync (alter x inc) (alter y + @x)))
                  :done-b)]
    (println @a)
    (println @b)
    (println @x)
    (println @y)
    (shutdown-agents)))

atom を ref に、swap! を alter に書き直します。そして、2 つの alter を dosync で囲みます。これで 2 つの更新がアトミック操作になります。

それでは実行してみましょう。

$ clj -M -m thread05
:done-a
:done-b
1000
500500
$ clj -M -m thread05
:done-a
:done-b
1000
500500
$ clj -M -m thread05
:done-a
:done-b
1000
500500

正常に動作しているようです。

●バリデーション

Ref はバリデーションチェックを行うことができます。一般に、「バリデーション (validation)」には検証や妥当性の確認といった意味があります。データのバリデーションチェックは、データの整合性が保たれているか検証・確認する機能ということになります。

Ref の場合、ref のオプション :validator を使うか、関数 set-validator! で指定します。

ref x :validator validate-fn
set-validator! ref-obj validate-fn

関数 validate-fn が真を返すとき、データの更新が行われます。偽を返す場合はエラーを送出します。

簡単な例を示しましょう。

user=> (def a (ref 0 :validator int?))
#'user/a

user=> (dosync (ref-set a 10))
10
user=> @a
10

user=> (dosync (ref-set a 1.0))
Execution error (IllegalStateException) at user/eval140 (REPL:1).
Invalid reference state

:validator に int? を指定します。ref-set で 10 を書き込むと int? は真を返すので、更新は正常に行われます。次に、ref-set で 1.0 を書き込むと、int? が false を返すので更新は失敗してエラーを送出します。

●トランザクションのリトライ

トランザクションの処理中に他のスレッドから Ref の値を更新された場合、実行中の処理を廃棄してトランザクションを最初からやり直します。これをリトライと呼ぶことにしましょう。次の例を見てください。

user=> (def cnt (ref 0))
#'user/cnt
user=> (defn counter [n] (dosync (println "start " n) (Thread/sleep 100) (alter cnt inc)))
#'user/counter

user=> (do (counter 1) (counter 2))
start  1
start  2
2
user=> @cnt
2

unser=> (pvalues (counter 1) (counter 2))
start  1
(start  2
start  2
3 4)
user=> @cnt
4

大域変数 cnt に (ref 0) をセットします。関数 counter はトランザクションの最初で start n を表示してから 100 msec 休止して、alter で cnt を +1 します。do で (counter 1) と (counter 2) を逐次実行すると、start 1 と start 2 が表示され、@cnt は 2 になります。

pvalues で並列に実行すると、start 1 と start 2 が表示された後、再度 start 2 が表示されて、@cnt は 4 になります。(counter 2) のトランザクションがリトライされていることがわかります。

ところで、数学や論理学において、演算や操作の順序を入れ替えても結果が変わらない性質を「可換性」といいます。たとえば、2 + 3 は数字を入れ替えて 3 + 2 としても結果は 5 になります。2 - 3 は -1 ですが、数字を入れ替えて 3 - 2 とすると 1 になるので、可換性は成り立ちません。

Ref の場合、更新関数の順番を変えても、Ref 変数の結果が変わらないことを更新関数の「可換性」といいます。(counter 1) と (counter 2) は、どちらが先に実行されようと、cnt の値は +2 されます。cnt の値が 0 であれば、どちらの場合も 2 になるわけです。

可換性が成り立つ場合、更新処理の途中で他のスレッドの更新処理が Ref 変数の値を書き換えたとしても、リトライする必要はありません。リトライして cnt の値を再取得して +1 するのか、リトライせずに書き換えられた値を +1 するのか、どちらの場合でも cnt の値は +2 されるので、正しい値が得られるわけです。

トランザクションのリトライを行わない関数が commute です。

commute の使い方は alter と同じですが、更新関数は可換性が成り立つ必要があります。簡単な例を示しましょう。

user=> (defn counter1 [n] (dosync (println "start " n) (Thread/sleep 100) (commute cnt inc)))
#'user/counter1
user=> (def cnt (ref 0))
#'user/cnt

user=> (pvalues (counter1 1) (counter1 2))
(start  start  2
1
1 1)
user=> @cnt
2

user=> (pvalues (counter1 1) (counter1 2))
start ( 1
start  2
3 4)
@cnt
4

関数 counter1 は counter の alter を commute に変更しただけです。pvalues で counter1 を並列に実行すると、トランザクションはリトライしませんが、@cnt の値は +2 されていて、正しい値が得られていることがわかります。

commute を使うと、並列処理を効率的に行うことができます。次の例を見てください。

user=> (def cnt (ref 0))
#'user/cnt
user=> (defn alter-inc [] (dosync (Thread/sleep 100) (alter cnt inc)))
#'user/alter-inc
user=> (defn commute-inc [] (dosync (Thread/sleep 100) (commute cnt inc)))
#'user/commute-inc

user=> (time (println (apply pcalls (repeat 10 alter-inc))))
(5 1 3 10 2 6 4 7 8 9)
"Elapsed time: 1015.957717 msecs"
nil
user=> @cnt
10

user=> (def cnt (ref 0))
#'user/cnt
user=> (time (println (apply pcalls (repeat 10 commute-inc))))
(3 4 5 2 5 1 7 8 8 8)
"Elapsed time: 204.685285 msecs"
nil
user=> @cnt
10

alter-inc と commute-inc は counter と counter1 から println を削除しただけです。pcalls で 10 個の alter-inc を評価すると、逐次実行と同程度の時間 (100 msec * 10 = 1 sec) がかかります。これに対し、10 個の commute-inc を pcalls で評価すると、並列に実行されるので時間は約 200 msec ですみます。

●Agent

Agent は変数の更新を非同期で行います。つまり、実際に更新が完了していなくても、更新関数はすぐにリターンして、次の処理へ進みます。Agent では更新処理をアクションと呼ぶことがあります。

Agent 型は関数 agent で生成します。

(agent init-value & options)

agent は初期値が init-value の Agent 型のデータを生成します。値の取得は Atom や Ref と同じで deref または @ を使います。agent はオプション :validator でバリデーションを指定することができます。もちろん、関数 set-validator! でも OK です。値の更新は次の関数を使います。

send と send-off は ref の値を (f @agent x y ...) の評価結果に書き換えます。値の更新は非同期で行われるため、send が終了したからといって、値が更新されているとは限りません。簡単な例を示しましょう。

user=> (def a (agent 0))
#'user/a
user=> a
#object[clojure.lang.Agent 0x1af1347d {:status :ready, :val 0}]
user=> @a
0

user=> (send a inc)
#object[clojure.lang.Agent 0x1af1347d {:status :ready, :val 0}]
user=> @a
1
user=> a
#object[clojure.lang.Agent 0x1af1347d {:status :ready, :val 1}]

(agent 0) で Agent 型のオブジェクトを生成します。初期値は 0 なので、@a は 0 を返します。次に、(send a inc) で a の値を +1 します。send は更新前の Agent 型のオブジェクトを直ぐに返します。それから、@a で a の値を求めると 1 になっていて、a の値が更新されていることがわかります。

関数 await は Agent の更新が終了するまで待機します。

(await & agents) => nil

簡単な例を示しましょう。

user=> (def b (agent '()))
#'user/b
user=> b
#object[clojure.lang.Agent 0x3af9aa66 {:status :ready, :val ()}]
user=> @b
()

user=> (do (send b conj 1) (send b conj 2) (send b conj 3) (await b))
nil
user=> @b
(3 2 1)

変数 b に (agent '()) をセットします。次に、do の中で (send b conj n) を呼び出して、リスト数値 n を追加していきます。最後に (await b) で更新が終了するのを待ちます。@b で b の値を求めると、数値がリストに追加されていることがわかります。

●send-off

IO などでブロックする可能性があるアクションの場合、send ではなく send-off を使います。簡単な例として画面に出力するデータをファイルにも書き込むプログラムを作ってみましょう。次のリストを見てください。

リスト : send-off の使用例 (thread07.clj)

(ns thread07)

(def log-file (agent "test_agent.txt"))

(defn write-log [msg n]
  (printf "%s %s\n" msg n)
  (flush)
  (send-off log-file (fn [file] (spit file (str msg " " n "\n") :append true) file)))

(defn -main []
  (let [a (future (dotimes [n 5]
                    (Thread/sleep 100) (write-log :test-a n))
                  :test-a-done)
        b (future (dotimes [n 7]
                    (Thread/sleep 100) (write-log :test-b n))
                  :test-b-done)]
    (println @a)
    (println @b)
    (await log-file)
    (shutdown-agents)))

変数 log-file に出力先のファイル名をセットします。関数 write-log は引数 msg と n を画面に出力し、同じ内容を log-file に書き込みます。ファイルの書き込みは send-off を使って非同期で行います。無名関数の引数 file に log-file のファイル名が渡されるので、spit で file の末尾に msg と n を追加します。最後に file を返します。log-file の値は file に書き換えられますが、file は log-file と同じ値なので、値は元のままです。

あとは future で 2 つのスレッド a, b を並列で起動し、終了するまで待機します。その後、await で log-file への書き込みが終了するのを待ってから、shutdown-agents を実行します。

それでは実行してみましょう。

$ clj -M -m thread07
:test-a 0
:test-b 0
:test-a 1
:test-b 1
:test-a 2
:test-b 2
:test-a 3
:test-b 3
:test-a 4
:test-b 4
:test-a-done
:test-b 5
:test-b 6
:test-b-done

$ cat test_agent.txt
:test-b 0
:test-a 0
:test-a 1
:test-b 1
:test-a 2
:test-b 2
:test-a 3
:test-b 3
:test-a 4
:test-b 4
:test-b 5
:test-b 6

正常に動作しているようです。興味のある方はいろいろ試してみてください。


初版 2025 年 8 月 7 日