将转换器应用于事件回调流

Apply transducer to stream of event callbacks

我正在写一些 clojurescript,它正在做一些日志处理。我正在包装一个 Javascript 库,每当有新的日志条目到达时,它都会给我一个回调,即

(.on my-logs-source "log-entry" handle-log-event)

我想使用 clojure 的 partition-by 函数之类的东西对这些事件执行一些简单的聚合,该函数 returns 是一个转换器,并获得结果的向量。将我的事件回调流转换为我可以应用传感器的东西的惯用方法是什么?

您可以使用 clojure.async - 它的通道在构建时可选择接受传感器。

下面的示例说明了如何实现您的目标。

(require '[clojure.core.async :as async])

(def ch (async/chan 1 (partition-by odd?)))

(def callback (fn [n] (async/put! ch n)))

(async/go-loop []
  (when-some [v (async/<! ch)]
    (println "Got" v)
    (recur)))

(callback 1)
(callback 1)
(callback 2)
(callback 2)
(callback 3)
(callback 4)

上面的代码将创建一个带有传感器的通道。您的回调函数会将收到的所有值发送到该通道。 go 块将在通道可用时使用它们。从通道消耗的值是换能器产生的结果。

对于上面的示例 REPL 会话,go 块的控制台输出如下:

Got [1 1]
Got [2 2]
Got [3]

现在当您关闭频道时:

(async/close! ch)

剩余数据将 "flushed" 来自您的换能器:

Got [4]