core.async 中的有状态传感器
Stateful transducers in core.async
我正在尝试了解如何在 core.async 中制作有状态传感器。
例如,我将如何制作一个传感器来计算通过通道的元素数量?例如,我想将输入转换为一个计数,该计数取决于之前出现的对象的数量。
据我所知,要走的路是使用 volatile!
来保持传感器内部的状态,但我仍然不确定如何将所有东西放在一起。
您需要一个有状态传感器,返回一个在 volatile!
跟踪计数时关闭的减少函数。
(defn count-xf [rf]
(let [ctr (volatile! 0)]
(fn
([] (rf))
([result] (rf result))
([result _] ; we ignore the input as
(rf result (vswap! ctr inc)))))) ; we just pass on the count
这可以使用核心函数进行简化completing
(defn count-xf [rf]
(let [ctr (volatile! 0)]
(completing
(fn [result _]
(rf result (vswap! ctr inc))))))
E. G。这么用
(let [ch (chan 1 count-xf)]
(onto-chan ch (repeat 10 true))
(<!! (clojure.core.async/into [] ch)))
;-> [1 2 3 4 5 6 7 8 9 10]
或者,您可以只使用 map-indexed
换能器,但这可能会帮助您减少对换能器工作原理的理解。对于这个特定的用例,它还需要一些额外的 per-step 开销。
(def count-xf (map-indexed (fn [i _] (inc i))))
观察其 implementation 与上面的实现几乎没有区别。
我正在尝试了解如何在 core.async 中制作有状态传感器。 例如,我将如何制作一个传感器来计算通过通道的元素数量?例如,我想将输入转换为一个计数,该计数取决于之前出现的对象的数量。
据我所知,要走的路是使用 volatile!
来保持传感器内部的状态,但我仍然不确定如何将所有东西放在一起。
您需要一个有状态传感器,返回一个在 volatile!
跟踪计数时关闭的减少函数。
(defn count-xf [rf]
(let [ctr (volatile! 0)]
(fn
([] (rf))
([result] (rf result))
([result _] ; we ignore the input as
(rf result (vswap! ctr inc)))))) ; we just pass on the count
这可以使用核心函数进行简化completing
(defn count-xf [rf]
(let [ctr (volatile! 0)]
(completing
(fn [result _]
(rf result (vswap! ctr inc))))))
E. G。这么用
(let [ch (chan 1 count-xf)]
(onto-chan ch (repeat 10 true))
(<!! (clojure.core.async/into [] ch)))
;-> [1 2 3 4 5 6 7 8 9 10]
或者,您可以只使用 map-indexed
换能器,但这可能会帮助您减少对换能器工作原理的理解。对于这个特定的用例,它还需要一些额外的 per-step 开销。
(def count-xf (map-indexed (fn [i _] (inc i))))
观察其 implementation 与上面的实现几乎没有区别。