core.async 的节流功能

Throttle Functions with core.async

应限制函数的可能执行次数。所以在调用一个函数之后,任何重复的调用都应该在一段时间内被忽略。如果中间有where调用,最后一个要在时间段后执行。

这是我使用 core.async 的方法。这里的问题是,额外的呼叫在通道 c 中汇总。我需要一个内部只有一个位置的通道,它将被 put 覆盖!每次。

(defn throttle [f time]
  (let [c (chan 1)]
    (go-loop []
      (apply f (<! c))
      (<! (timeout time))
      (recur))
    (fn [& args]
      (put! c (if args args [])))))

用法:

(def throttled (throttle #(print %) 4000))
(doseq [x (range 10)]
   (throttled x))

; 0
;... after 4 seconds
; 9

有人知道如何解决这个问题吗?

解决方案

(defn throttle [f time]
  (let [c (chan (sliding-buffer 1))]
    (go-loop []
      (apply f (<! c))
      (<! (timeout time))
      (recur))
    (fn [& args]
      (put! c (or args [])))))

您可以使用 debounce 函数。

我会复制到这里:

(defn debounce [in ms]
  (let [out (chan)]
    (go-loop [last-val nil]
      (let [val (if (nil? last-val) (<! in) last-val)
            timer (timeout ms)
            [new-val ch] (alts! [in timer])]
        (condp = ch
          timer (do (>! out val) (recur nil))
          in (recur new-val))))
    out))

这里只有当 in 没有为 ms 发出消息时,它发出的最后一个值才会转发到 out 通道。虽然 in 继续发射而发射之间没有足够长的停顿,然后所有但最后一条消息都被连续丢弃。

我测试过这个功能。它等待 4 秒,然后打印出 9,这几乎就是您所要求的 - 需要进行一些调整!

(defn my-sender [to-chan values]
  (go-loop [[x & xs] values]
           (>! to-chan x)
           (when (seq xs) (recur xs))))

(defn my-receiver [from-chan f]
  (go-loop []
           (let [res (<! from-chan)]
             (f res)
             (recur))))

(defn setup-and-go []
  (let [in (chan)
        ch (debounce in 4000)
        sender (my-sender in (range 10))
        receiver (my-receiver ch #(log %))])) 

而这个版本的debounce会按照题目的要求输出,马上0,然后等四秒,然后9:

(defn debounce [in ms]
  (let [out (chan)]
    (go-loop [last-val nil
              first-time true]
             (let [val (if (nil? last-val) (<! in) last-val)
                   timer (timeout (if first-time 0 ms))
                   [new-val ch] (alts! [in timer])]
               (condp = ch
                 timer (do (>! out val) (recur nil false))
                 in (recur new-val false))))
    out)) 

我使用了 log 而不是像您那样使用 print。您不能依赖普通的 println/print 函数和 core.async。有关解释,请参阅

要解决您的频道问题,您可以使用带有滑动缓冲区的频道:

user> (require '[clojure.core.async :as async])
nil
user> (def c (async/chan (async/sliding-buffer 1)))
#'user/c
user> (async/>!! c 1)
true
user> (async/>!! c 2)
true
user> (async/>!! c 3)
true
user> (async/<!! c)
3

那样只会在下一个时间间隔计算最后放入通道的值。

这是摘自大卫 Nolens blog's source code:

(defn throttle*
  ([in msecs]
    (throttle* in msecs (chan)))
  ([in msecs out]
    (throttle* in msecs out (chan)))
  ([in msecs out control]
    (go
      (loop [state ::init last nil cs [in control]]
        (let [[_ _ sync] cs]
          (let [[v sc] (alts! cs)]
            (condp = sc
              in (condp = state
                   ::init (do (>! out v)
                            (>! out [::throttle v])
                            (recur ::throttling last
                              (conj cs (timeout msecs))))
                   ::throttling (do (>! out v)
                                  (recur state v cs)))
              sync (if last 
                     (do (>! out [::throttle last])
                       (recur state nil
                         (conj (pop cs) (timeout msecs))))
                     (recur ::init last (pop cs)))
              control (recur ::init nil
                        (if (= (count cs) 3)
                          (pop cs)
                          cs)))))))
    out))

(defn throttle-msg? [x]
  (and (vector? x)
       (= (first x) ::throttle)))

(defn throttle
  ([in msecs] (throttle in msecs (chan)))
  ([in msecs out]
    (->> (throttle* in msecs out)
      (filter #(and (vector? %) (= (first %) ::throttle)))
      (map second))))

您可能还想在频道中添加一个 dedupe 换能器。

我需要传递一个函数来捕获参数,因为我将它用于输入事件并且它传递了一个可变对象。

(defn throttle-for-mutable-args [time f arg-capture-fn]
  (let [c (async/chan (async/sliding-buffer 1))]
    (async-m/go-loop []
      (f (async/<! c))
      (async/<! (async/timeout time))
      (recur))
    (fn [& args]
      (async/put! c (apply arg-capture-fn (or args []))))))

我喜欢用

[:input
  {:onChange (util/throttle-for-mutable-args                                      
               500
               #(really-use-arg %)                                 
               #(-> % .-target .-value))}]