如何使用 core.async 正确地批处理消息?
How to properly batch messages with core.async?
我想按 和 超时计数在 core.async 上批量发送消息(即 10 毫秒或 10 条消息,以先到者为准)。 Tim Baldridge has a video on batching,但它使用了 core.async 中已弃用的函数,并且不使用转换器。我正在寻找类似以下的内容...
(defn batch [in out max-time max-count]
...
)
换能器不应该真正成为批处理功能的关注点——作为 in
通道上的接收者,它会看到该通道上任何换能器转换的值,以及收听 [=12] 的任何接收者=] 将依次看到由该通道的传感器转换的值。
至于实现,下面的函数将从 in
中获取 max-count
批项目,或者自上一批输出以来 max-time
到达的项目,然后输出它们到 out
,在输入通道关闭时关闭,受输入通道的传感器影响(如果有,任何收听 out
的接受者也将如上所述应用该通道的传感器):
(defn batch [in out max-time max-count]
(let [lim-1 (dec max-count)]
(async/go-loop [buf [] t (async/timeout max-time)]
(let [[v p] (async/alts! [in t])]
(cond
(= p t)
(do
(async/>! out buf)
(recur [] (async/timeout max-time)))
(nil? v)
(if (seq buf)
(async/>! out buf))
(== (count buf) lim-1)
(do
(async/>! out (conj buf v))
(recur [] (async/timeout max-time)))
:else
(recur (conj buf v) t))))))
我想按 和 超时计数在 core.async 上批量发送消息(即 10 毫秒或 10 条消息,以先到者为准)。 Tim Baldridge has a video on batching,但它使用了 core.async 中已弃用的函数,并且不使用转换器。我正在寻找类似以下的内容...
(defn batch [in out max-time max-count]
...
)
换能器不应该真正成为批处理功能的关注点——作为 in
通道上的接收者,它会看到该通道上任何换能器转换的值,以及收听 [=12] 的任何接收者=] 将依次看到由该通道的传感器转换的值。
至于实现,下面的函数将从 in
中获取 max-count
批项目,或者自上一批输出以来 max-time
到达的项目,然后输出它们到 out
,在输入通道关闭时关闭,受输入通道的传感器影响(如果有,任何收听 out
的接受者也将如上所述应用该通道的传感器):
(defn batch [in out max-time max-count]
(let [lim-1 (dec max-count)]
(async/go-loop [buf [] t (async/timeout max-time)]
(let [[v p] (async/alts! [in t])]
(cond
(= p t)
(do
(async/>! out buf)
(recur [] (async/timeout max-time)))
(nil? v)
(if (seq buf)
(async/>! out buf))
(== (count buf) lim-1)
(do
(async/>! out (conj buf v))
(recur [] (async/timeout max-time)))
:else
(recur (conj buf v) t))))))