如何使用 core.async 代替回调?

How to use core.async in place of callbacks?

我在 ClojureScript 上使用 core.async 以避免使用 node.js 回调。问题是我达到了 1024 条待处理消息的限制。

为了避免这种情况,我需要将所有消息发送到同一 go 块内的频道。但这在 core.async 上是不可能的,因为匿名函数使 go 的效果无效,所以我不能这样做:

(go
  (. socket on "data" #(>! chan %)))

那么,有没有办法绕过这个限制?

我不确定我是否理解为什么它必须在同一个 go 块中。通常,你会这样做:

(. socket on "data" #(go (put! chan %)))

然后在另一个go块中处理chan

为了模拟错误,我们可以"simulate"一种回调:

(let [callback (atom nil)
      on-data (fn [fun]
                (reset! callback fun))
      chan (async/chan)]

如果我们尝试使用 (on-data #(async/go (async/put! chan %))) 添加回调,则会超出限制。此外,由于我们正在使用 async/go,它会导致频道内的消息乱序。

我发现解决这个问题的唯一方法是在 atom 中创建一个 promise-chan 的无限列表,每个回调都会选取第一个元素,发布一条消息,然后删除首先是列表。然后,我们可以在 go 块中有一个 doseq 来为我们发布消息:

(let [inf-list (atom (map (fn [_] (async/promise-chan)) (range)))]
  ; We need to iterate over inf-list before anything
  (let [lst @inf-list]
    (async/go
     (doseq [c lst]
       (async/>! chan (async/<! c)))))

  (on-data #(do
              (async/put! (first @inf-list) %)
              (swap! inf-list rest))))