创建需要多次关闭的通道

Create channels that need to be closed several times

我的情况是同一个通道在不同的函数之间共享,我需要在所有这些函数发出完成信号时关闭通道。这是我想出的,但它并不理想,因为我需要处理两个频道并且需要发送一个特殊的关键字而不是使用关闭!功能。还有另一种更好的方法吗? core.async 有一些功能可以做到这一点吗?

(defn shared-chan [n]
  (let [in (chan)
        out (chan)]
    (go-loop [n n]
      (if (= n 0)
        (do
          (async/close! in)
          (async/close! out))
        (let [in-msg (<! in)]
          (if (not= :close in-msg)
            (do
              (>! out in-msg)
              (recur n))
            (recur (dec n))))))
    [in out]))

merge 或许能帮到你。根据文档,merge

Takes a collection of source channels and returns a channel which contains all values taken from them. ... The channel will close after all the source channels have closed.

所以,基本上您需要为每个函数创建一个通道,然后 merge 将它们合二为一。

您可以使用 take 转换器生成此共享频道。例如,如果您想要 3 个项目的自动关闭频道:

user> (require '[clojure.core.async :as a])
nil

user> (def shared-ch (a/chan 3 (take 3)))
#'user/shared-ch

user> (a/go-loop []
        (when-let [val (a/<! shared-ch)]
          (println :reading val)
          (recur)))

#object[clojure.core.async.impl.channels.ManyToManyChannel 0x6bb1dee5 "clojure.core.async.impl.channels.ManyToManyChannel@6bb1dee5"]

user> (a/>!! shared-ch 1)
true:reading 1

user> (a/>!! shared-ch 2)
true:reading 2

user> (a/>!! shared-ch 3)
true:reading 3

user> (a/>!! shared-ch 4)
false

user> (a/>!! shared-ch 5)
false

你可以看到,一旦换能器耗尽,通道就会关闭(因为 false 在写入尝试后返回)