core.async 循环阻塞等待从通道读取
core.async loop blocked waiting to read from channel
假设我有一个频道 out (chan)
。我需要获取放入通道的值并添加它们。值的数量是不确定的(因此不能使用带有 (<! out)
的结束案例的传统循环)并且来自外部 IO。我使用固定的 timeout
和 alts!
但这似乎不是解决问题的最佳方法。到目前为止,我得到了以下内容(我从 https://gist.github.com/schaueho/5726a96641693dce3e47 得到的)
(go-loop
[[v ch] (alts! [out (timeout 1000)])
acc 0]
(if-not v
(do (close! out)
(deliver p acc))
(do
(>! task-ch (as/progress-tick))
(recur (alts! [out (timeout 1000)]) (+ acc v)))))
我遇到的问题是 1000 的超时有时是不够的,会导致 go-loop 过早退出(因为 IO 操作可能需要超过 1000 毫秒才能完成并将 val 放入out
频道)。我不认为增加超时值是个好主意,因为它可能会导致我等待的时间超过必要的时间。
保证所有从 out 通道读取并正确退出循环的最佳方法是什么?
更新:
为什么我要使用超时?
因为放入通道的值的数量是不固定的;这意味着,我无法创建退出案例。 W/o 退出情况下,go-loop 将无限期地停放等待 ((<! out)
) 将值放入通道输出。如果你有一个没有超时的解决方案,那就太棒了。
我怎么知道我读到了最后一个值?
我不。那就是问题所在。这就是我使用超时和 alts 的原因!!退出循环。
你想对结果做什么?
现在简单添加。然而,这不是最重要的一点。
最终更新:
我想出了一种方法来获取我要处理的值的数量。所以我修改了我的逻辑来利用它。我仍然会使用超时和 alts!以防止任何锁定。
(go-loop
[[v _] (alts! [out (timeout 1000)])
i 0
acc 0]
(if (and v (not= n i))
(do
(>! task-ch (as/progress-tick))
(recur (alts! [out (timeout 1000)]) (inc i) (+ acc v)))
(do (close! out)
(deliver p* (if (= n i) acc nil)))))
我认为您的问题有点 higher-up 在您的设计中,而不是 core-async 具体问题:
一方面,您有不确定数量的值进入通道 — 可能有 0 个,可能有 10 个,可能有 1,000,000 个。
另一方面,您想读取其中的 所有 ,进行一些计算,然后 return。这是不可能的——除非有其他信号可以用来表示 "I think I'm done now".
如果该信号是值的时间,那么您使用 alts!
的方法是正确的,尽管我相信代码可以稍微简化。
更新:您可以访问 "upstream" IO 吗?当 IO 操作完成时,您可以将标记值(例如 ::closed
之类的东西)放入通道吗?
'best' 方法是等待来自 out 的特殊批处理结束消息或 out 被发送者关闭以标记输入结束。
无论哪种方式,解决方案取决于发件人就输入传达一些信息。
假设我有一个频道 out (chan)
。我需要获取放入通道的值并添加它们。值的数量是不确定的(因此不能使用带有 (<! out)
的结束案例的传统循环)并且来自外部 IO。我使用固定的 timeout
和 alts!
但这似乎不是解决问题的最佳方法。到目前为止,我得到了以下内容(我从 https://gist.github.com/schaueho/5726a96641693dce3e47 得到的)
(go-loop
[[v ch] (alts! [out (timeout 1000)])
acc 0]
(if-not v
(do (close! out)
(deliver p acc))
(do
(>! task-ch (as/progress-tick))
(recur (alts! [out (timeout 1000)]) (+ acc v)))))
我遇到的问题是 1000 的超时有时是不够的,会导致 go-loop 过早退出(因为 IO 操作可能需要超过 1000 毫秒才能完成并将 val 放入out
频道)。我不认为增加超时值是个好主意,因为它可能会导致我等待的时间超过必要的时间。
保证所有从 out 通道读取并正确退出循环的最佳方法是什么?
更新:
为什么我要使用超时?
因为放入通道的值的数量是不固定的;这意味着,我无法创建退出案例。 W/o 退出情况下,go-loop 将无限期地停放等待 ((<! out)
) 将值放入通道输出。如果你有一个没有超时的解决方案,那就太棒了。
我怎么知道我读到了最后一个值? 我不。那就是问题所在。这就是我使用超时和 alts 的原因!!退出循环。
你想对结果做什么? 现在简单添加。然而,这不是最重要的一点。
最终更新:
我想出了一种方法来获取我要处理的值的数量。所以我修改了我的逻辑来利用它。我仍然会使用超时和 alts!以防止任何锁定。
(go-loop
[[v _] (alts! [out (timeout 1000)])
i 0
acc 0]
(if (and v (not= n i))
(do
(>! task-ch (as/progress-tick))
(recur (alts! [out (timeout 1000)]) (inc i) (+ acc v)))
(do (close! out)
(deliver p* (if (= n i) acc nil)))))
我认为您的问题有点 higher-up 在您的设计中,而不是 core-async 具体问题:
一方面,您有不确定数量的值进入通道 — 可能有 0 个,可能有 10 个,可能有 1,000,000 个。
另一方面,您想读取其中的 所有 ,进行一些计算,然后 return。这是不可能的——除非有其他信号可以用来表示 "I think I'm done now".
如果该信号是值的时间,那么您使用 alts!
的方法是正确的,尽管我相信代码可以稍微简化。
更新:您可以访问 "upstream" IO 吗?当 IO 操作完成时,您可以将标记值(例如 ::closed
之类的东西)放入通道吗?
'best' 方法是等待来自 out 的特殊批处理结束消息或 out 被发送者关闭以标记输入结束。
无论哪种方式,解决方案取决于发件人就输入传达一些信息。