使用core.asyncpub/sub时如何避免掉落物品?

How to avoid dropping items when using core.async pub/sub?

我有一个频道充当发布者:

(def publisher (async/chan))
(def publication (async/pub publisher :topic))

由于 sub/pub 的性质,当我这样做时:

(async/put! publisher {:topic :foo})

消息被发布消耗,并且由于没有订阅者,它将被丢弃。

如果我尝试订阅 :foo 主题:

(def reader (async/chan))
(async/sub publication :foo reader)
(async/go (println "got val " (async/<! reader)))

我不会看到任何打印内容。但是如果我在发布者中放置更多项目:

(async/put! c1 {:topic :foo :msg "after"})
==> got val {:topic :foo :msg "after"}

有没有办法不丢失发布者最后 n 个项目即使订阅者还没有订阅

documentation 显式:

Items received when there are no matching subs get dropped

pub 接受给定主题的 buf-fn 函数。这个函数应该return一个缓冲区。如dropping-buffersliding-buffer。因此,如果您希望缓冲 :foo 主题:

(pub pub-ch :topic #(if (= % :foo) (sliding-buffer 10) nil))

另见 relevant code section