当线程不断从中获取通道时,什么时候会丢弃通道?
When will channel be discarded when a thread keeps taking from it?
考虑以下摘自 example walkthrough of core.async 的代码:
(let [c1 (chan)
c2 (chan)]
(thread
(while true
(let [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch))))
(>!! c1 "hi")
(>!! c2 "there"))
我的假设是该线程具有对通道 c1
和 c2
的引用,并且基本上 运行 将永远尝试从永远不会出现的任何一个中获取值。因此,通道既不会被垃圾回收,线程也不会终止。即使我们显式 close!
通道,线程仍会继续。我的结论正确还是我遗漏了什么?
我问这个问题是因为我正在尝试找到一种方法来成功地测试这样的 core.async 代码和如此无休止的 运行ning 消费者。我目前的尝试是这样的:
(let [c1 (chan)
c2 (chan)]
(go
(>!! c1 "hi")
(>!! c2 "there"))
(async/thread
(loop [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch)
(when-let [[nv nch] (alts!! [c1 c2])]
(if nv
(recur [nv nch])
:done)))))
这个 return 是一个结果通道(来自 thread
),我想阻止它采用 :done
值,但我需要一种关闭方式(至少一个的)渠道。我可以 return 两个频道的列表 c1, c2
和结果频道 return 由 thread
编辑然后 close!
例如c1
之后查看结果通道,但是非常难看:
(let [c1 (chan)
c2 (chan)]
(go
(>!! c1 "hi")
(>!! c2 "there"))
[c1 c2 (async/thread
(loop [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch)
(when-let [[nv nch] (alts!! [c1 c2])]
(if nv
(recur [nv nch])
:done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>
(let [[c1 c2 resultchan] *1]
(close! c1)
(<!! resultchan))
=>:done
或者,我可能会发送一个特殊的 "End-of-Communication" 值,然后我可以在接收方检查它。
这方面的最佳做法是什么?
我不知道对于这种特殊情况是否有最佳实践之类的东西。这是我解决问题的方法。我觉得很简单。
(defn alts-while-open [f & chans]
(let [a-chans (atom (set chans))]
(go (while (< 0 (count @a-chans))
(println "iteration started : " (vec @a-chans))
(let [[v ch] (alts! (vec @a-chans))]
(if v
(f v ch)
(swap! a-chans #(disj % ch))))))))
函数f在alts的结果上执行!在渠道畅通的同时。我在这里保留了一个带有一组开放通道的原子。一旦我找到一个关闭的频道,我就将它从这个集合中删除。如果没有更多打开的通道,则 while 循环停止。你可以 运行 它 :
(def c1 (chan))
(def c2 (chan))
(def c3 (chan))
(alts-while-open (fn [v ch] (println v)) c1 c2 c3)
现在,当向这些通道中的任何一个写入内容时,它都会被打印出来。您可以看到新的迭代在那之后开始。关闭通道后,您可以看到迭代开始,但通道向量减少了。一旦所有通道都关闭,while 循环就会停止。
是否使用close这个问题不好回答!函数或其他一些通知机制来停止循环。我认为这取决于情况。如果 "end of communication" 没有任何复杂的处理,我会关闭!这个频道。如果有更复杂的逻辑 - 例如有成功的 "end of communication" 和失败的 "end of communication" 选项,我想以不同的方式处理它们,那么我宁愿发送一条特殊消息。像
[:end-of-communication :success]
当您不发送简单值时,发送特殊 end-of-communication
的想法不起作用,因为您无法保证值将按照您希望的顺序放置和获取.
下一个想法是发送方和接收方都提前知道要处理的值的数量,例如像这样:
user> (<!!
(let [c1 (chan)
values ["hi" "there"]
vcount (count values)]
(doseq [value values]
(thread
(>!! c1 value)))
(thread
(loop [recvalue (<!! c1)
reccount 1]
(println "Read" recvalue)
(if (= reccount vcount)
(do (close! c1)
:done)
(recur (<!! c1) (inc reccount)))))))
Read hi
Read there
:done
这原则上可行,但有一个明显的缺点,即您必须在设置发送和接收流程之前就金额达成一致,还有一个不太明显的缺点,即万一发送方出现问题,您我们最终会再次在接收端无休止地等待(假设发送端所做的不仅仅是将值放入通道)。
我得出的结论是,唯一可靠的方法是使用 timeout
频道,如下所示:
(<!! (let [c1 (chan)
tchan (timeout 1000)
values ["hi" "there"]]
(doseq [value values]
(thread
(>!! c1 value)))
(thread
(loop [[recvalue rchan] (alts!! [c1 tchan])
timeoutchan tchan]
(if (= rchan timeoutchan)
(do (close! c1)
:done)
(do (println "Read" recvalue)
(let [newtimeout (timeout 1000)]
(recur (alts!! [c1 newtimeout])
newtimeout))))))
Read hi
Read there
:done
考虑以下摘自 example walkthrough of core.async 的代码:
(let [c1 (chan)
c2 (chan)]
(thread
(while true
(let [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch))))
(>!! c1 "hi")
(>!! c2 "there"))
我的假设是该线程具有对通道 c1
和 c2
的引用,并且基本上 运行 将永远尝试从永远不会出现的任何一个中获取值。因此,通道既不会被垃圾回收,线程也不会终止。即使我们显式 close!
通道,线程仍会继续。我的结论正确还是我遗漏了什么?
我问这个问题是因为我正在尝试找到一种方法来成功地测试这样的 core.async 代码和如此无休止的 运行ning 消费者。我目前的尝试是这样的:
(let [c1 (chan)
c2 (chan)]
(go
(>!! c1 "hi")
(>!! c2 "there"))
(async/thread
(loop [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch)
(when-let [[nv nch] (alts!! [c1 c2])]
(if nv
(recur [nv nch])
:done)))))
这个 return 是一个结果通道(来自 thread
),我想阻止它采用 :done
值,但我需要一种关闭方式(至少一个的)渠道。我可以 return 两个频道的列表 c1, c2
和结果频道 return 由 thread
编辑然后 close!
例如c1
之后查看结果通道,但是非常难看:
(let [c1 (chan)
c2 (chan)]
(go
(>!! c1 "hi")
(>!! c2 "there"))
[c1 c2 (async/thread
(loop [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch)
(when-let [[nv nch] (alts!! [c1 c2])]
(if nv
(recur [nv nch])
:done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>
(let [[c1 c2 resultchan] *1]
(close! c1)
(<!! resultchan))
=>:done
或者,我可能会发送一个特殊的 "End-of-Communication" 值,然后我可以在接收方检查它。
这方面的最佳做法是什么?
我不知道对于这种特殊情况是否有最佳实践之类的东西。这是我解决问题的方法。我觉得很简单。
(defn alts-while-open [f & chans]
(let [a-chans (atom (set chans))]
(go (while (< 0 (count @a-chans))
(println "iteration started : " (vec @a-chans))
(let [[v ch] (alts! (vec @a-chans))]
(if v
(f v ch)
(swap! a-chans #(disj % ch))))))))
函数f在alts的结果上执行!在渠道畅通的同时。我在这里保留了一个带有一组开放通道的原子。一旦我找到一个关闭的频道,我就将它从这个集合中删除。如果没有更多打开的通道,则 while 循环停止。你可以 运行 它 :
(def c1 (chan))
(def c2 (chan))
(def c3 (chan))
(alts-while-open (fn [v ch] (println v)) c1 c2 c3)
现在,当向这些通道中的任何一个写入内容时,它都会被打印出来。您可以看到新的迭代在那之后开始。关闭通道后,您可以看到迭代开始,但通道向量减少了。一旦所有通道都关闭,while 循环就会停止。
是否使用close这个问题不好回答!函数或其他一些通知机制来停止循环。我认为这取决于情况。如果 "end of communication" 没有任何复杂的处理,我会关闭!这个频道。如果有更复杂的逻辑 - 例如有成功的 "end of communication" 和失败的 "end of communication" 选项,我想以不同的方式处理它们,那么我宁愿发送一条特殊消息。像
[:end-of-communication :success]
当您不发送简单值时,发送特殊 end-of-communication
的想法不起作用,因为您无法保证值将按照您希望的顺序放置和获取.
下一个想法是发送方和接收方都提前知道要处理的值的数量,例如像这样:
user> (<!!
(let [c1 (chan)
values ["hi" "there"]
vcount (count values)]
(doseq [value values]
(thread
(>!! c1 value)))
(thread
(loop [recvalue (<!! c1)
reccount 1]
(println "Read" recvalue)
(if (= reccount vcount)
(do (close! c1)
:done)
(recur (<!! c1) (inc reccount)))))))
Read hi
Read there
:done
这原则上可行,但有一个明显的缺点,即您必须在设置发送和接收流程之前就金额达成一致,还有一个不太明显的缺点,即万一发送方出现问题,您我们最终会再次在接收端无休止地等待(假设发送端所做的不仅仅是将值放入通道)。
我得出的结论是,唯一可靠的方法是使用 timeout
频道,如下所示:
(<!! (let [c1 (chan)
tchan (timeout 1000)
values ["hi" "there"]]
(doseq [value values]
(thread
(>!! c1 value)))
(thread
(loop [[recvalue rchan] (alts!! [c1 tchan])
timeoutchan tchan]
(if (= rchan timeoutchan)
(do (close! c1)
:done)
(do (println "Read" recvalue)
(let [newtimeout (timeout 1000)]
(recur (alts!! [c1 newtimeout])
newtimeout))))))
Read hi
Read there
:done