如何使用 core.async 个通道对 Rx 的“withLatestFrom”进行建模?
How to model Rx's `withLatestFrom` with core.async channels?
例如,给定一个带有操作的通道和另一个带有数据的通道,如何编写一个 go
块来将操作应用于数据通道上的最后一个值?
(go-loop []
(let [op (<! op-ch)
data (<! data-ch)]
(put! result-ch (op data))))
显然这行不通,因为它要求两个频道具有相同的频率。
使用alts!
你可以完成你想要的。
下面显示的 with-latest-from
实现了与 RxJS 的 withLatestFrom
相同的行为(我认为 :P)。
(require '[clojure.core.async :as async])
(def op-ch (async/chan))
(def data-ch (async/chan))
(defn with-latest-from [chs f]
(let [result-ch (async/chan)
latest (vec (repeat (count chs) nil))
index (into {} (map vector chs (range)))]
(async/go-loop [latest latest]
(let [[value ch] (async/alts! chs)
latest (assoc latest (index ch) value)]
(when-not (some nil? latest)
(async/put! result-ch (apply f latest)))
(when value (recur latest))))
result-ch))
(def result-ch (with-latest-from [op-ch data-ch] str))
(async/go-loop []
(prn (async/<! result-ch))
(recur))
(async/put! op-ch :+)
;= true
(async/put! data-ch 1)
;= true
; ":+1"
(async/put! data-ch 2)
;= true
; ":+2"
(async/put! op-ch :-)
;= true
; ":-2"
alts!
有一个 :priority true
选项。
始终returns 某个频道中最新看到的值的表达式看起来像这样:
(def in-chan (chan))
(def mem (chan))
(go (let [[ch value] (alts! [in-chan mem] :priority true)]
(take! mem) ;; clear mem (take! is non-blocking)
(>! mem value) ;; put the new (or old) value in the mem
value ;; return a chan with the value in
它未经测试,可能效率不高(volatile
变量可能更好)。 go
-block returns 一个只有值的通道,但这个想法可以扩展到一些 "memoized" 通道。
例如,给定一个带有操作的通道和另一个带有数据的通道,如何编写一个 go
块来将操作应用于数据通道上的最后一个值?
(go-loop []
(let [op (<! op-ch)
data (<! data-ch)]
(put! result-ch (op data))))
显然这行不通,因为它要求两个频道具有相同的频率。
使用alts!
你可以完成你想要的。
下面显示的 with-latest-from
实现了与 RxJS 的 withLatestFrom
相同的行为(我认为 :P)。
(require '[clojure.core.async :as async])
(def op-ch (async/chan))
(def data-ch (async/chan))
(defn with-latest-from [chs f]
(let [result-ch (async/chan)
latest (vec (repeat (count chs) nil))
index (into {} (map vector chs (range)))]
(async/go-loop [latest latest]
(let [[value ch] (async/alts! chs)
latest (assoc latest (index ch) value)]
(when-not (some nil? latest)
(async/put! result-ch (apply f latest)))
(when value (recur latest))))
result-ch))
(def result-ch (with-latest-from [op-ch data-ch] str))
(async/go-loop []
(prn (async/<! result-ch))
(recur))
(async/put! op-ch :+)
;= true
(async/put! data-ch 1)
;= true
; ":+1"
(async/put! data-ch 2)
;= true
; ":+2"
(async/put! op-ch :-)
;= true
; ":-2"
alts!
有一个 :priority true
选项。
始终returns 某个频道中最新看到的值的表达式看起来像这样:
(def in-chan (chan))
(def mem (chan))
(go (let [[ch value] (alts! [in-chan mem] :priority true)]
(take! mem) ;; clear mem (take! is non-blocking)
(>! mem value) ;; put the new (or old) value in the mem
value ;; return a chan with the value in
它未经测试,可能效率不高(volatile
变量可能更好)。 go
-block returns 一个只有值的通道,但这个想法可以扩展到一些 "memoized" 通道。