在 Clojure 中计划执行函数

Scheduled execution of a function in Clojure

我想在 Clojure 中创建一个 batch 函数:

给定一个 delay-ms,它会在该时间段内批量调用函数 f,并一次性发送。

这是一个天真的实现:

(defn delay-f
  [f delay-ms]
  (let [timer (Timer.)
        task (atom nil)
        latest-batch (atom [])]
    (fn [& args]
      (swap! latest-batch conj args)
      (when-not @task
        (let [new-task (proxy [TimerTask] []
                         (run []
                           (f @latest-batch)
                           (reset! task nil)
                           (reset! latest-batch [])
                           (.purge timer)))]
          (reset! task new-task)
          (.schedule timer new-task delay-ms))))))

我很确定,鉴于我在这里使用原子,存在竞争条件。

这里的惯用解决方案是什么?

我认为解决这个问题的最佳方法是使用定时器库overtone/at-at instead of reinventing the wheel. In particular, the function overtone.at-at/every 提供您想要的调度。

定义一个原子来保存要执行的累积任务(a.k.a“thunks”),以及一个函数来将新任务追加到队列中。

定义一个传递给 every 的“执行”函数,它将清除队列并按顺序执行在那里找到的每个任务。

Clojure 原子将防止任何竞争条件,因为在任何给定时间点只允许执行“追加”函数或“执行”函数。如果两个函数都尝试同时修改原子内容,则其中一个将被迫等待另一个完成。


对于替代库,请参阅主题 Scheduling 下的 Clojure-Toolbox

只要在 TimerTask 仍在执行时调用该函数,您的代码就会可靠地中断。首先,您想在 运行 函数之前而不是之后重置任务和最新批次原子。这仍然会有竞争条件,尽管可能性较小。我们可以改用 ConcurrentLinkedQueue:

(defn delay-f
  [f delay-ms]
  (let [timer (java.util.Timer.)
        task (atom nil)
        queue (java.util.concurrent.ConcurrentLinkedQueue.)]
    (fn [& args]
      (.offer queue args)
      (when-not @task
        (let [new-task (proxy [java.util.TimerTask] []
                         (run []
                           (reset! task nil)
                           (f (loop [r []]
                                (if-let  [e (.poll queue)]
                                  (recur (conj r e))
                                  r)))
                           (.purge timer)))]
          (reset! task new-task)
          (.schedule timer new-task delay-ms))))))