如何使用 core.async 正确地批处理消息?

How to properly batch messages with core.async?

我想按 超时计数在 core.async 上批量发送消息(即 10 毫秒或 10 条消息,以先到者为准)。 Tim Baldridge has a video on batching,但它使用了 core.async 中已弃用的函数,并且不使用转换器。我正在寻找类似以下的内容...

(defn batch [in out max-time max-count]
  ...
 )

换能器不应该真正成为批处理功能的关注点——作为 in 通道上的接收者,它会看到该通道上任何换能器转换的值,以及收听 [=12] 的任何接收者=] 将依次看到由该通道的传感器转换的值。

至于实现,下面的函数将从 in 中获取 max-count 批项目,或者自上一批输出以来 max-time 到达的项目,然后输出它们到 out,在输入通道关闭时关闭,受输入通道的传感器影响(如果有,任何收听 out 的接受者也将如上所述应用该通道的传感器):

(defn batch [in out max-time max-count]
  (let [lim-1 (dec max-count)]
    (async/go-loop [buf [] t (async/timeout max-time)]
      (let [[v p] (async/alts! [in t])]
        (cond
          (= p t)
          (do
            (async/>! out buf)
            (recur [] (async/timeout max-time)))

          (nil? v)
          (if (seq buf)
            (async/>! out buf))

          (== (count buf) lim-1)
          (do
            (async/>! out (conj buf v))
            (recur [] (async/timeout max-time)))

          :else
          (recur (conj buf v) t))))))