如何使用换能器从另一个通道创建通道?

How to create a channel from another with transducers?

我想从另一个只过滤特定消息的频道创建一个 clojure.core.async 频道。因此我找到了一个名为 filter<.

的函数
=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2

但是该函数及其朋友被标记为已弃用:

Deprecated - this function will be removed. Use transducer instead

有一些方法可以使用带有传感器的通道,例如 chanxform 参数。如何使用换能器从现有通道构建新通道?

我对此做了一些研究,发现了几篇有趣的文章 (first and second), and then got something working using pipeline

(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))

(pipeline 4 c2 (filter even?) c1)

(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2

我链接的第二篇文章通过围绕管道函数的一些辅助函数使它更清晰一些:

(defn ncpus []
  (.availableProcessors (Runtime/getRuntime)))

(defn parallelism []
  (+ (ncpus) 1))

(defn add-transducer
  [in xf]
  (let [out (chan (buffer 16))]
    (pipeline (parallelism) out xf in)
    out))

然后您可以简单地将频道与

绑定在一起
(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))

要完成答案,您发现自己可以以类似的方式使用管道:

(defn pipe-trans
  [ci xf]
  (let [co (chan 1 xf)]
    (pipe ci co)
    co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))