当线程不断从中获取通道时,什么时候会丢弃通道?

When will channel be discarded when a thread keeps taking from it?

考虑以下摘自 example walkthrough of core.async 的代码:

(let [c1 (chan)
      c2 (chan)]
   (thread 
      (while true
         (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
   (>!! c1 "hi")
   (>!! c2 "there"))

我的假设是该线程具有对通道 c1c2 的引用,并且基本上 运行 将永远尝试从永远不会出现的任何一个中获取值。因此,通道既不会被垃圾回收,线程也不会终止。即使我们显式 close! 通道,线程仍会继续。我的结论正确还是我遗漏了什么?

我问这个问题是因为我正在尝试找到一种方法来成功地测试这样的 core.async 代码和如此无休止的 运行ning 消费者。我目前的尝试是这样的:

(let [c1 (chan)
      c2 (chan)]
    (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
    (async/thread
      (loop [[v ch] (alts!! [c1 c2])]
        (println "Read" v "from" ch)
        (when-let [[nv nch] (alts!! [c1 c2])]
          (if nv
             (recur [nv nch])
             :done)))))

这个 return 是一个结果通道(来自 thread),我想阻止它采用 :done 值,但我需要一种关闭方式(至少一个的)渠道。我可以 return 两个频道的列表 c1, c2 和结果频道 return 由 thread 编辑然后 close! 例如c1 之后查看结果通道,但是非常难看:

(let [c1 (chan)
      c2 (chan)]
    (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
    [c1 c2 (async/thread
      (loop [[v ch] (alts!! [c1 c2])]
        (println "Read" v "from" ch)
        (when-let [[nv nch] (alts!! [c1 c2])]
          (if nv
              (recur [nv nch])
              :done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
   Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
   Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>

(let [[c1 c2 resultchan] *1]
  (close! c1)
  (<!! resultchan))
=>:done

或者,我可能会发送一个特殊的 "End-of-Communication" 值,然后我可以在接收方检查它。

这方面的最佳做法是什么?

我不知道对于这种特殊情况是否有最佳实践之类的东西。这是我解决问题的方法。我觉得很简单。

(defn alts-while-open [f & chans]
   (let [a-chans (atom (set chans))]
     (go (while (< 0 (count @a-chans))
           (println "iteration started : " (vec @a-chans))
           (let [[v ch] (alts! (vec @a-chans))]
             (if v
               (f v ch)
               (swap! a-chans #(disj % ch))))))))

函数f在alts的结果上执行!在渠道畅通的同时。我在这里保留了一个带有一组开放通道的原子。一旦我找到一个关闭的频道,我就将它从这个集合中删除。如果没有更多打开的通道,则 while 循环停止。你可以 运行 它 :

(def c1 (chan))
(def c2 (chan))
(def c3 (chan))
(alts-while-open (fn [v ch] (println v)) c1 c2 c3)

现在,当向这些通道中的任何一个写入内容时,它都会被打印出来。您可以看到新的迭代在那之后开始。关闭通道后,您可以看到迭代开始,但通道向量减少了。一旦所有通道都关闭,while 循环就会停止。

是否使用close这个问题不好回答!函数或其他一些通知机制来停止循环。我认为这取决于情况。如果 "end of communication" 没有任何复杂的处理,我会关闭!这个频道。如果有更复杂的逻辑 - 例如有成功的 "end of communication" 和失败的 "end of communication" 选项,我想以不同的方式处理它们,那么我宁愿发送一条特殊消息。像

[:end-of-communication :success]

当您不发送简单值时,发送特殊 end-of-communication 的想法不起作用,因为您无法保证值将按照您希望的顺序放置和获取.

下一个想法是发送方和接收方都提前知道要处理的值的数量,例如像这样:

user> (<!!
        (let [c1 (chan)
              values ["hi" "there"]
              vcount (count values)]
           (doseq [value values]
             (thread
                  (>!! c1 value)))
           (thread
               (loop [recvalue (<!! c1)
                      reccount 1]
                  (println "Read" recvalue)
                  (if (= reccount vcount)
                      (do (close! c1)
                          :done)
                      (recur (<!! c1) (inc reccount)))))))
Read hi
Read there
:done

这原则上可行,但有一个明显的缺点,即您必须在设置发送和接收流程之前就金额达成一致,还有一个不太明显的缺点,即万一发送方出现问题,您我们最终会再次在接收端无休止地等待(假设发送端所做的不仅仅是将值放入通道)。

我得出的结论是,唯一可靠的方法是使用 timeout 频道,如下所示:

(<!! (let [c1 (chan)
           tchan (timeout 1000) 
           values ["hi" "there"]]
       (doseq [value values]
         (thread
           (>!! c1 value)))
       (thread
          (loop [[recvalue rchan] (alts!! [c1 tchan])
                 timeoutchan tchan]
            (if (= rchan timeoutchan)
                (do (close! c1)
                    :done)
                (do (println "Read" recvalue)
                    (let [newtimeout (timeout 1000)]
                        (recur (alts!! [c1 newtimeout])
                               newtimeout))))))

Read hi
Read there
:done