为什么 `core.async/pipeline` return 是一个频道?

Why does `core.async/pipeline` return a channel?

我刚刚注意到 pipeline 系列 return 是一个 channel,它似乎完全独立于管道及其相关通道的用途。

请注意,在以下示例中,您可以分别从 pipesa> / b> >! / <!,它们是不相关的。

据我了解,pipelines 应该是空操作,并且 return nil 在设置来自 [=21 的副作用 transduction 时=] 到 b>.

所以,我错过了什么,为什么 pipeline return channel

(def a> (chan))
(def b> (chan))
(def pipes (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(go (>! pipes "hello world"))
(go (println "Pipes: " (<! pipes)))
(go (>! a> "apples are gooood"))
(go (println "B: " (<! b>)))

您返回一个通道,该通道在没有更多元素可复制时关闭。也就是说,在 a> 关闭后,它的所有元素都变成大写并放在 b> 上。您可以 <! 从生成的通道中找出流水线操作何时完成,如果您关心的话,或者您可以直接丢弃该通道。你可能不应该写信给它。

这是许多异步操作的常见模式,而且确实经常隐含地发生:每个 go 块 returns 一个通道,其中写入了块的 return 值当块完成时,许多异步操作使用 go 块作为它们的 return 值,因此您会自动获得此 "job done" 通道作为结果。

为了阐述@amalloy 的回答,在下面的示例中,a>b> 在能够完成时将 true 放在它们上面。由于 chan> 是无缓冲的,它们无法完成,直到另一个进程从它们中取出,即最后的 println

如果 chan> 被缓冲,a>b> 可以立即 >!,所以立即打印。

(def chan> (chan 4))
(def a> (go (>! chan> "Apple")))
(go (println "from a>: " (<! a>)))
(def b> (go (>! chan> "Ball")))
(go (println "from b>: " (<! b>)))

(go (println "from chan>: "(<! chan>)))
;; => from chan>: Apple
;; => from a>: true
(go (println "from chan>: "(<! chan>)))
;; => from chan>: Ball
;; => from b>: true

这与 pipelines 背后的想法相同。


;; Pipeline-specific

(def a> (chan))
(def b> (chan))
(def p> (pipeline 4 b> (map clojure.string/upper-case) a>))

;; this won't happen until `a>` `close!`s
(go (println "pipeline is done: " (<! p>)))

;; execute the following 2 lines ad lib
(go (>! a> "hi there"))
(go (println "from b>: " (<! b>)))

(comment
  (close! a>) ; triggers the "pipeline is done"
  (close! b>)) ; doesn't trigger it, but `b>` now only returns nil when taking