core.async 的节流功能
Throttle Functions with core.async
应限制函数的可能执行次数。所以在调用一个函数之后,任何重复的调用都应该在一段时间内被忽略。如果中间有where调用,最后一个要在时间段后执行。
这是我使用 core.async 的方法。这里的问题是,额外的呼叫在通道 c 中汇总。我需要一个内部只有一个位置的通道,它将被 put 覆盖!每次。
(defn throttle [f time]
(let [c (chan 1)]
(go-loop []
(apply f (<! c))
(<! (timeout time))
(recur))
(fn [& args]
(put! c (if args args [])))))
用法:
(def throttled (throttle #(print %) 4000))
(doseq [x (range 10)]
(throttled x))
; 0
;... after 4 seconds
; 9
有人知道如何解决这个问题吗?
解决方案
(defn throttle [f time]
(let [c (chan (sliding-buffer 1))]
(go-loop []
(apply f (<! c))
(<! (timeout time))
(recur))
(fn [& args]
(put! c (or args [])))))
您可以使用 debounce 函数。
我会复制到这里:
(defn debounce [in ms]
(let [out (chan)]
(go-loop [last-val nil]
(let [val (if (nil? last-val) (<! in) last-val)
timer (timeout ms)
[new-val ch] (alts! [in timer])]
(condp = ch
timer (do (>! out val) (recur nil))
in (recur new-val))))
out))
这里只有当 in
没有为 ms
发出消息时,它发出的最后一个值才会转发到 out
通道。虽然 in
继续发射而发射之间没有足够长的停顿,然后所有但最后一条消息都被连续丢弃。
我测试过这个功能。它等待 4 秒,然后打印出 9
,这几乎就是您所要求的 - 需要进行一些调整!
(defn my-sender [to-chan values]
(go-loop [[x & xs] values]
(>! to-chan x)
(when (seq xs) (recur xs))))
(defn my-receiver [from-chan f]
(go-loop []
(let [res (<! from-chan)]
(f res)
(recur))))
(defn setup-and-go []
(let [in (chan)
ch (debounce in 4000)
sender (my-sender in (range 10))
receiver (my-receiver ch #(log %))]))
而这个版本的debounce
会按照题目的要求输出,马上0,然后等四秒,然后9:
(defn debounce [in ms]
(let [out (chan)]
(go-loop [last-val nil
first-time true]
(let [val (if (nil? last-val) (<! in) last-val)
timer (timeout (if first-time 0 ms))
[new-val ch] (alts! [in timer])]
(condp = ch
timer (do (>! out val) (recur nil false))
in (recur new-val false))))
out))
我使用了 log
而不是像您那样使用 print
。您不能依赖普通的 println/print
函数和 core.async
。有关解释,请参阅 。
要解决您的频道问题,您可以使用带有滑动缓冲区的频道:
user> (require '[clojure.core.async :as async])
nil
user> (def c (async/chan (async/sliding-buffer 1)))
#'user/c
user> (async/>!! c 1)
true
user> (async/>!! c 2)
true
user> (async/>!! c 3)
true
user> (async/<!! c)
3
那样只会在下一个时间间隔计算最后放入通道的值。
这是摘自大卫 Nolens blog's source code:
(defn throttle*
([in msecs]
(throttle* in msecs (chan)))
([in msecs out]
(throttle* in msecs out (chan)))
([in msecs out control]
(go
(loop [state ::init last nil cs [in control]]
(let [[_ _ sync] cs]
(let [[v sc] (alts! cs)]
(condp = sc
in (condp = state
::init (do (>! out v)
(>! out [::throttle v])
(recur ::throttling last
(conj cs (timeout msecs))))
::throttling (do (>! out v)
(recur state v cs)))
sync (if last
(do (>! out [::throttle last])
(recur state nil
(conj (pop cs) (timeout msecs))))
(recur ::init last (pop cs)))
control (recur ::init nil
(if (= (count cs) 3)
(pop cs)
cs)))))))
out))
(defn throttle-msg? [x]
(and (vector? x)
(= (first x) ::throttle)))
(defn throttle
([in msecs] (throttle in msecs (chan)))
([in msecs out]
(->> (throttle* in msecs out)
(filter #(and (vector? %) (= (first %) ::throttle)))
(map second))))
您可能还想在频道中添加一个 dedupe
换能器。
我需要传递一个函数来捕获参数,因为我将它用于输入事件并且它传递了一个可变对象。
(defn throttle-for-mutable-args [time f arg-capture-fn]
(let [c (async/chan (async/sliding-buffer 1))]
(async-m/go-loop []
(f (async/<! c))
(async/<! (async/timeout time))
(recur))
(fn [& args]
(async/put! c (apply arg-capture-fn (or args []))))))
我喜欢用
[:input
{:onChange (util/throttle-for-mutable-args
500
#(really-use-arg %)
#(-> % .-target .-value))}]
应限制函数的可能执行次数。所以在调用一个函数之后,任何重复的调用都应该在一段时间内被忽略。如果中间有where调用,最后一个要在时间段后执行。
这是我使用 core.async 的方法。这里的问题是,额外的呼叫在通道 c 中汇总。我需要一个内部只有一个位置的通道,它将被 put 覆盖!每次。
(defn throttle [f time]
(let [c (chan 1)]
(go-loop []
(apply f (<! c))
(<! (timeout time))
(recur))
(fn [& args]
(put! c (if args args [])))))
用法:
(def throttled (throttle #(print %) 4000))
(doseq [x (range 10)]
(throttled x))
; 0
;... after 4 seconds
; 9
有人知道如何解决这个问题吗?
解决方案
(defn throttle [f time]
(let [c (chan (sliding-buffer 1))]
(go-loop []
(apply f (<! c))
(<! (timeout time))
(recur))
(fn [& args]
(put! c (or args [])))))
您可以使用 debounce 函数。
我会复制到这里:
(defn debounce [in ms]
(let [out (chan)]
(go-loop [last-val nil]
(let [val (if (nil? last-val) (<! in) last-val)
timer (timeout ms)
[new-val ch] (alts! [in timer])]
(condp = ch
timer (do (>! out val) (recur nil))
in (recur new-val))))
out))
这里只有当 in
没有为 ms
发出消息时,它发出的最后一个值才会转发到 out
通道。虽然 in
继续发射而发射之间没有足够长的停顿,然后所有但最后一条消息都被连续丢弃。
我测试过这个功能。它等待 4 秒,然后打印出 9
,这几乎就是您所要求的 - 需要进行一些调整!
(defn my-sender [to-chan values]
(go-loop [[x & xs] values]
(>! to-chan x)
(when (seq xs) (recur xs))))
(defn my-receiver [from-chan f]
(go-loop []
(let [res (<! from-chan)]
(f res)
(recur))))
(defn setup-and-go []
(let [in (chan)
ch (debounce in 4000)
sender (my-sender in (range 10))
receiver (my-receiver ch #(log %))]))
而这个版本的debounce
会按照题目的要求输出,马上0,然后等四秒,然后9:
(defn debounce [in ms]
(let [out (chan)]
(go-loop [last-val nil
first-time true]
(let [val (if (nil? last-val) (<! in) last-val)
timer (timeout (if first-time 0 ms))
[new-val ch] (alts! [in timer])]
(condp = ch
timer (do (>! out val) (recur nil false))
in (recur new-val false))))
out))
我使用了 log
而不是像您那样使用 print
。您不能依赖普通的 println/print
函数和 core.async
。有关解释,请参阅
要解决您的频道问题,您可以使用带有滑动缓冲区的频道:
user> (require '[clojure.core.async :as async])
nil
user> (def c (async/chan (async/sliding-buffer 1)))
#'user/c
user> (async/>!! c 1)
true
user> (async/>!! c 2)
true
user> (async/>!! c 3)
true
user> (async/<!! c)
3
那样只会在下一个时间间隔计算最后放入通道的值。
这是摘自大卫 Nolens blog's source code:
(defn throttle*
([in msecs]
(throttle* in msecs (chan)))
([in msecs out]
(throttle* in msecs out (chan)))
([in msecs out control]
(go
(loop [state ::init last nil cs [in control]]
(let [[_ _ sync] cs]
(let [[v sc] (alts! cs)]
(condp = sc
in (condp = state
::init (do (>! out v)
(>! out [::throttle v])
(recur ::throttling last
(conj cs (timeout msecs))))
::throttling (do (>! out v)
(recur state v cs)))
sync (if last
(do (>! out [::throttle last])
(recur state nil
(conj (pop cs) (timeout msecs))))
(recur ::init last (pop cs)))
control (recur ::init nil
(if (= (count cs) 3)
(pop cs)
cs)))))))
out))
(defn throttle-msg? [x]
(and (vector? x)
(= (first x) ::throttle)))
(defn throttle
([in msecs] (throttle in msecs (chan)))
([in msecs out]
(->> (throttle* in msecs out)
(filter #(and (vector? %) (= (first %) ::throttle)))
(map second))))
您可能还想在频道中添加一个 dedupe
换能器。
我需要传递一个函数来捕获参数,因为我将它用于输入事件并且它传递了一个可变对象。
(defn throttle-for-mutable-args [time f arg-capture-fn]
(let [c (async/chan (async/sliding-buffer 1))]
(async-m/go-loop []
(f (async/<! c))
(async/<! (async/timeout time))
(recur))
(fn [& args]
(async/put! c (apply arg-capture-fn (or args []))))))
我喜欢用
[:input
{:onChange (util/throttle-for-mutable-args
500
#(really-use-arg %)
#(-> % .-target .-value))}]