使用 Clojure 的 core.async 使用文件内容

Consuming file contents with Clojure's core.async

我正在尝试将 Clojure 的 core.async 库用于文件中的 consume/process 行。当我的代码执行时 IOException: Stream closed 被抛出。下面是一个 REPL 会话,它重现了与我的代码中相同的问题:

(require '[clojure.core.async :as async])
(require '[clojure.java.io :as io])

; my real code is a bit more involved with calls to drop, map, filter
; following line-seq
(def lines
  (with-open [reader (io/reader "my-file.txt")]
    (line-seq reader)))

(def ch
  (let [c (async/chan)]
    (async/go
      (doseq [ln lines]
        (async/>! c ln))
      (async/close! c))
    c))

; line that causes the error
; java.io.IOException: Stream closed
(async/<!! ch)

因为这是我第一次做这样的事情(异步+文件),也许我对它应该如何工作有一些误解。有人可以阐明将文件行发送到通道管道的正确方法是什么吗?

谢谢!

您的问题是 with-open 语句。退出此范围后,文件将立即关闭。因此,您打开 line-seq 然后在读取任何行之前关闭文件。

大多数文件使用 slurp 函数会更好:

(require '[clojure.string :as str])

(def file-as-str   (slurp "my-file.txt"))
(def lines         (str/split-lines file-as-str))

参见:

http://clojuredocs.org/clojure.core/slurp

http://clojuredocs.org/clojure.string/split-lines

作为 , your definition of lines closes the file without reading all of its lines, because line-seq returns a lazy sequence. If you expand your use of the with-open 宏...

(macroexpand-1
 '(with-open [reader (io/reader "my-file.txt")]
    (line-seq reader)))

...你明白了:

(clojure.core/let [reader (io/reader "my-file.txt")]
  (try
    (clojure.core/with-open []
      (line-seq reader))
    (finally
      (. reader clojure.core/close))))

您可以通过在读完文件后关闭文件来解决此问题,而不是立即关闭:

(def ch
  (let [c (async/chan)]
    (async/go
      (with-open [reader (io/reader "my-file.txt")]
        (doseq [ln (line-seq reader)]
          (async/>! c ln)))
      (async/close! c))
    c))