core.async 循环阻塞等待从通道读取

core.async loop blocked waiting to read from channel

假设我有一个频道 out (chan)。我需要获取放入通道的值并添加它们。值的数量是不确定的(因此不能使用带有 (<! out) 的结束案例的传统循环)并且来自外部 IO。我使用固定的 timeoutalts! 但这似乎不是解决问题的最佳方法。到目前为止,我得到了以下内容(我从 https://gist.github.com/schaueho/5726a96641693dce3e47 得到的)

(go-loop
      [[v ch] (alts! [out (timeout 1000)])
       acc 0]
      (if-not v
        (do (close! out)
            (deliver p acc))
        (do
          (>! task-ch (as/progress-tick))
          (recur (alts! [out (timeout 1000)]) (+ acc v)))))

我遇到的问题是 1000 的超时有时是不够的,会导致 go-loop 过早退出(因为 IO 操作可能需要超过 1000 毫秒才能完成并将 val 放入out 频道)。我不认为增加超时值是个好主意,因为它可能会导致我等待的时间超过必要的时间。

保证所有从 out 通道读取并正确退出循环的最佳方法是什么?

更新:

为什么我要使用超时? 因为放入通道的值的数量是不固定的;这意味着,我无法创建退出案例。 W/o 退出情况下,go-loop 将无限期地停放等待 ((<! out)) 将值放入通道输出。如果你有一个没有超时的解决方案,那就太棒了。

我怎么知道我读到了最后一个值? 我不。那就是问题所在。这就是我使用超时和 alts 的原因!!退出循环。

你想对结果做什么? 现在简单添加。然而,这不是最重要的一点。

最终更新:

我想出了一种方法来获取我要处理的值的数量。所以我修改了我的逻辑来利用它。我仍然会使用超时和 alts!以防止任何锁定。

(go-loop
     [[v _] (alts! [out (timeout 1000)])
      i 0
      acc 0]
      (if (and v (not= n i))
        (do
          (>! task-ch (as/progress-tick))
          (recur (alts! [out (timeout 1000)]) (inc i) (+ acc v)))
        (do (close! out)
            (deliver p* (if (= n i) acc nil)))))

我认为您的问题有点 higher-up 在您的设计中,而不是 core-async 具体问题:

一方面,您有不确定数量的值进入通道 — 可能有 0 个,可能有 10 个,可能有 1,000,000 个。

另一方面,您想读取其中的 所有 ,进行一些计算,然后 return。这是不可能的——除非有其他信号可以用来表示 "I think I'm done now".

如果该信号是值的时间,那么您使用 alts! 的方法是正确的,尽管我相信代码可以稍微简化。

更新:您可以访问 "upstream" IO 吗?当 IO 操作完成时,您可以将标记值(例如 ::closed 之类的东西)放入通道吗?

'best' 方法是等待来自 out 的特殊批处理结束消息或 out 被发送者关闭以标记输入结束。

无论哪种方式,解决方案取决于发件人就输入传达一些信息。