如何防止 close!-ing 在放入 onto-chan 之前

How to prevent close!-ing before put-ing in onto-chan

我想 运行 一个类似

的代码
(->> input
     (partition-all 5)
     (map a-side-effect)
     dorun)

异步划分输入和输出(副作用)。

那我写了下面的代码来实验一下。

;; using boot-clj
(set-env! :dependencies '[[org.clojure/core.async "0.2.374"]])
(require '[clojure.core.async :as async :refer [<! <!! >! >!!]])

(let [input (range 18)
      c (async/chan 1 (comp (partition-all 5)
                            (map prn)))]
  (async/onto-chan c input false)
  (async/close! c))

此代码的解释:

我希望上面的代码打印出来

[0 1 2 3 4]
[5 6 7 8 9]
[10 11 12 13 14]
[15 16 17]

但在 REPL 中它不打印任何字符。

然后我加一个等待的时间,像这样

(let [c (async/chan 1 (comp (partition-all 5)
                            (map prn)))]
  (async/onto-chan c (range 18) false)
  (Thread/sleep 1000) ;wait
  (async/close! c))

以上代码给出了我的预期输出。

然后我检查 core.async/onto-chan

我认为发生了什么:

  1. 频道 c core.async/close! 在我的代码中编辑。
  2. core.async/onto-chan 的参数的每一项在 onto-chan 中的 go-loop 中被放入(core.async/>!)是徒劳的,因为通道 c 已关闭.

有确定的方法可以在 close!ing 之前放置物品吗? 不使用 go-loop?

编写 onto-chan 的同步版本

还是我的想法有误?

采纳 megakorre 的想法:

(let [c (async/chan 1 (comp (partition-all 5)
                            (map prn)))
      put-ch (async/onto-chan c (range 18) false)]
  (async/alts!! [put-ch])
  (async/close! c))

你的第二个例子 Thread.sleep 只是错误地“起作用”了。

它起作用的原因是来自 c 转换器的每个转换结果值都是 nil,并且由于通道中不允许使用 nil,一个例外被抛出,并且没有值被放入 c:这就是允许生产者 onto-chan 继续放入通道而不是阻塞等待的原因。如果将第二个示例粘贴到 REPL 中,您将看到四个堆栈跟踪 - 每个分区一个。

nils 当然是由于 prn 上的映射,这是对所有输入 returns nil 的副作用函数。

如果我正确理解你的设计,你的目标是做这样的事情:

(defn go-run! [ch proc]
  (async/go-loop []
    (when-let [value (<! ch)]
      (proc value)
      (recur))))

(let [input (range 18)
      c (async/chan 1 (partition-all 5))]
  (async/onto-chan c input)
  (<!! (go-run! c prn)))
  • 你确实需要一个生产者和一个消费者,否则你的程序会阻塞。我介绍了一个 go-loop 消费者。
  • 一般来说,map 和副作用不能很好地结合在一起,所以我将副作用 prn 提取到消费者中。
  • onto-chan 不能被“多次”调用(至少在显示的代码中如此)所以它不需要 false 参数。