如何确保所有数据都从 clojure 的 core.async 通道写入文件?
how can I insure all data is written to a file from clojure's core.async channels?
我是 core.async 的新手,我一直在尝试了解如何最好地利用 core.async 和文件 IO。我整理了一个测试,虽然打印到控制台有效,但未能写入最后一个文件。知道我错过了什么吗?
首先,一些函数...
(defn thread-write-to-files [channel]
(let [writer (atom nil)]
(thread
(loop []
(when-some [value (<!! channel)]
(if (and (map? value) (= :FILE (:type value)))
(do (when @writer (.close ^Writer @writer))
(reset! writer (io/writer (File. ^String (:name value))))
(recur))
(do (when @writer (.write @writer value)
(println value))
(recur)))))
(when @writer
(do (.flush @writer)
(.close ^Writer @writer))))))
(defn add-line-number [channel-in channel-out]
(go-loop [line-number 1]
(when-some [value (<! channel-in)]
(if (and (map? value) (= :FILE (:type value)))
(do (>! channel-out value)
(recur 1))
(do (>! channel-out (str line-number ". " value))
(recur (inc line-number)))))))
现在是一个利用它们的测试...
(deftest test-thread-write-to-file
(let [input-coll ["This gets skipped"
{:type :FILE :name "foo.txt"}
"This is the first line of foo!\n"
"This is the second line of foo.\n"
{:type :FILE :name "bar.txt"}
"Bar me 1.\n"
"Bar me 2.\n"
"Bar me 3.\n"
{:type :FILE :name "baz.txt"}
"BBBBBBBBBBB\n"
"AAAAAAAAAAA\n"
"ZZZZZZZZZZZ\n"]
input-channel (async/to-chan input-coll)
output-channel (chan)
foo (File. "foo.txt")
bar (File. "bar.txt")
baz (File. "baz.txt")]
(when (.exists foo) (.delete foo))
(when (.exists bar) (.delete bar))
(when (.exists baz) (.delete baz))
(add-line-number input-channel output-channel)
(thread-write-to-files output-channel)
(Thread/sleep 1000)
(is (.exists foo))
(is (.exists bar))
(is (.exists baz))
(is (> (.length foo) 0))
(is (> (.length bar) 0))
(is (> (.length baz) 0))))
测试的最后一个条件失败。文件 baz.txt 已创建,但为空。我的 REPL 从输入中打印出每一行,所以我很困惑为什么文件仍然是空的。
在 thread-write-to-files
中,您执行最终文件的刷新和关闭 当输入通道关闭时 (当 (when-some [value (<!! channel)] ...)
得到 nil 并退出loop
).
您的测试永远不会关闭通道,所以这不会发生。尝试在 output-channel
上使用 close!
,或者使用 onto-chan
而不是 to-chan
将测试数据集合放入系统中。
我是 core.async 的新手,我一直在尝试了解如何最好地利用 core.async 和文件 IO。我整理了一个测试,虽然打印到控制台有效,但未能写入最后一个文件。知道我错过了什么吗?
首先,一些函数...
(defn thread-write-to-files [channel]
(let [writer (atom nil)]
(thread
(loop []
(when-some [value (<!! channel)]
(if (and (map? value) (= :FILE (:type value)))
(do (when @writer (.close ^Writer @writer))
(reset! writer (io/writer (File. ^String (:name value))))
(recur))
(do (when @writer (.write @writer value)
(println value))
(recur)))))
(when @writer
(do (.flush @writer)
(.close ^Writer @writer))))))
(defn add-line-number [channel-in channel-out]
(go-loop [line-number 1]
(when-some [value (<! channel-in)]
(if (and (map? value) (= :FILE (:type value)))
(do (>! channel-out value)
(recur 1))
(do (>! channel-out (str line-number ". " value))
(recur (inc line-number)))))))
现在是一个利用它们的测试...
(deftest test-thread-write-to-file
(let [input-coll ["This gets skipped"
{:type :FILE :name "foo.txt"}
"This is the first line of foo!\n"
"This is the second line of foo.\n"
{:type :FILE :name "bar.txt"}
"Bar me 1.\n"
"Bar me 2.\n"
"Bar me 3.\n"
{:type :FILE :name "baz.txt"}
"BBBBBBBBBBB\n"
"AAAAAAAAAAA\n"
"ZZZZZZZZZZZ\n"]
input-channel (async/to-chan input-coll)
output-channel (chan)
foo (File. "foo.txt")
bar (File. "bar.txt")
baz (File. "baz.txt")]
(when (.exists foo) (.delete foo))
(when (.exists bar) (.delete bar))
(when (.exists baz) (.delete baz))
(add-line-number input-channel output-channel)
(thread-write-to-files output-channel)
(Thread/sleep 1000)
(is (.exists foo))
(is (.exists bar))
(is (.exists baz))
(is (> (.length foo) 0))
(is (> (.length bar) 0))
(is (> (.length baz) 0))))
测试的最后一个条件失败。文件 baz.txt 已创建,但为空。我的 REPL 从输入中打印出每一行,所以我很困惑为什么文件仍然是空的。
在 thread-write-to-files
中,您执行最终文件的刷新和关闭 当输入通道关闭时 (当 (when-some [value (<!! channel)] ...)
得到 nil 并退出loop
).
您的测试永远不会关闭通道,所以这不会发生。尝试在 output-channel
上使用 close!
,或者使用 onto-chan
而不是 to-chan
将测试数据集合放入系统中。