如何使用 core.async 代替回调?
How to use core.async in place of callbacks?
我在 ClojureScript 上使用 core.async 以避免使用 node.js 回调。问题是我达到了 1024 条待处理消息的限制。
为了避免这种情况,我需要将所有消息发送到同一 go
块内的频道。但这在 core.async 上是不可能的,因为匿名函数使 go
的效果无效,所以我不能这样做:
(go
(. socket on "data" #(>! chan %)))
那么,有没有办法绕过这个限制?
我不确定我是否理解为什么它必须在同一个 go 块中。通常,你会这样做:
(. socket on "data" #(go (put! chan %)))
然后在另一个go块中处理chan
为了模拟错误,我们可以"simulate"一种回调:
(let [callback (atom nil)
on-data (fn [fun]
(reset! callback fun))
chan (async/chan)]
如果我们尝试使用 (on-data #(async/go (async/put! chan %)))
添加回调,则会超出限制。此外,由于我们正在使用 async/go
,它会导致频道内的消息乱序。
我发现解决这个问题的唯一方法是在 atom
中创建一个 promise-chan
的无限列表,每个回调都会选取第一个元素,发布一条消息,然后删除首先是列表。然后,我们可以在 go
块中有一个 doseq
来为我们发布消息:
(let [inf-list (atom (map (fn [_] (async/promise-chan)) (range)))]
; We need to iterate over inf-list before anything
(let [lst @inf-list]
(async/go
(doseq [c lst]
(async/>! chan (async/<! c)))))
(on-data #(do
(async/put! (first @inf-list) %)
(swap! inf-list rest))))
我在 ClojureScript 上使用 core.async 以避免使用 node.js 回调。问题是我达到了 1024 条待处理消息的限制。
为了避免这种情况,我需要将所有消息发送到同一 go
块内的频道。但这在 core.async 上是不可能的,因为匿名函数使 go
的效果无效,所以我不能这样做:
(go
(. socket on "data" #(>! chan %)))
那么,有没有办法绕过这个限制?
我不确定我是否理解为什么它必须在同一个 go 块中。通常,你会这样做:
(. socket on "data" #(go (put! chan %)))
然后在另一个go块中处理chan
为了模拟错误,我们可以"simulate"一种回调:
(let [callback (atom nil)
on-data (fn [fun]
(reset! callback fun))
chan (async/chan)]
如果我们尝试使用 (on-data #(async/go (async/put! chan %)))
添加回调,则会超出限制。此外,由于我们正在使用 async/go
,它会导致频道内的消息乱序。
我发现解决这个问题的唯一方法是在 atom
中创建一个 promise-chan
的无限列表,每个回调都会选取第一个元素,发布一条消息,然后删除首先是列表。然后,我们可以在 go
块中有一个 doseq
来为我们发布消息:
(let [inf-list (atom (map (fn [_] (async/promise-chan)) (range)))]
; We need to iterate over inf-list before anything
(let [lst @inf-list]
(async/go
(doseq [c lst]
(async/>! chan (async/<! c)))))
(on-data #(do
(async/put! (first @inf-list) %)
(swap! inf-list rest))))