如何使用 Clojure Core Async 安排相隔 `n` 秒的函数列表

How to schedule a list of functions `n` seconds apart with Clojure Core Async

在我的 Clojure 项目中,我试图列出对 API 的 http 调用列表,该列表具有限速器,每分钟仅允许 n 次调用。我希望在所有 http 调用完成后对每个响应进行 returned 以进行进一步处理。我是 Clojure 的 Core Async 的新手,但认为它很合适,但因为我需要 运行 每次调用 n 秒,所以我也在尝试使用 Chime 库。在 Chime 的库中,它有使用 Core Async 的示例,但这些示例在每个时间间隔都调用相同的函数,这不适用于此用例。

虽然可能有一种使用 chime-async 的方法可以更好地服务于这个用例,但我的所有尝试都失败了,所以我尝试简单地用核心异步包装 Chime 调用,但我可能Core Async 比 Chime 更令人困惑。

这是我名字的例子space。

(ns mp.util.schedule
  (:require [chime.core :as chime]
            [clojure.core.async :as a]
            [tick.alpha.api :as tick]))

(defn schedule-fns
  "Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
   optionally provide an inst to start from"
  [fs sec & [{:keys [inst] :or {inst (tick/now)}}]]
  (let [ch (a/chan (count fs))
        chime-times (map-indexed
                      (fn mapped-fn [i f]
                        (a/put! ch (chime/chime-at [(.plusSeconds inst (* i sec))]
                                                   (fn wrapped-fn [_] (f)))))
                      fs)]
    (doseq [chi chime-times]
      (a/<!! chi))))
; === Test Code ===

; simple test function
(defn sim-fn
  "simple function that prints a message and value, then returns the value"
  [v m]
  (println m :at (tick/now))
  v)

; list of test functions
(def fns [#(sim-fn 1 :one)
          #(sim-fn 2 :two)
          #(sim-fn 3 :three)])

调用 (schedule-fns fns 2) 时我想要发生的是每个函数在 fns 到 运行 n 秒之间以及 schedule-fns 到return (1 2 3)(函数的 return 值),但这不是它正在做的。它在正确的时间调用每个函数(我可以从日志语句中看到)但它没有 returning 任何东西并且有一个我不明白的错误。我得到:

(schedule-fns fns 2)
:one :at #time/instant "2021-03-05T23:31:52.565Z"
Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval11496$fn$G (protocols.clj:15).
No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: java.lang.Boolean
:two :at #time/instant "2021-03-05T23:31:54.568Z"
:three :at #time/instant "2021-03-05T23:31:56.569Z"

如果我能得到帮助让我的代码正确使用 Core Async(有或没有 Chime),我将不胜感激。谢谢。

试试这个:

(defn sim-fn
  "simple function that prints a message and value, then returns the value"
  [v m]
  (println m)
  v)

; list of test functions
(def fns [#(sim-fn 1 :one)
          #(sim-fn 2 :two)
          #(sim-fn 3 :three)])

(defn schedule-fns [fns sec]
       (let [program (interpose #(Thread/sleep (* sec 1000))
                                fns)]
         (remove #(= % nil)
                 (for [p program]
                         (p)))))

然后调用:

> (schedule-fns fns 2)
:one
:two
:three
=> (1 2 3)

我想出了一个方法来获得我想要的...但有一些注意事项。

(def results (atom []))

(defn schedule-fns
  "Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
   optionally provide an inst to start from"
  [fs sec]
  (let [ch (chan (count fs))]
    (go-loop []
      (swap! results conj (<! ch))
      (recur))
    (map-indexed (fn [i f]
                   (println :waiting (* i sec) :seconds)
                   (go (<! (timeout (* i sec 1000)))
                       (>! ch (f))))
                 fs)))

此代码具有我想要的时间和行为,但我必须使用 atom 来存储响应。虽然我可以添加一个观察者来确定所有结果何时出现,但我仍然觉得我不应该这样做。

我想我现在会使用它,但在某些时候我会继续努力,如果有人有比这种方法更好的东西,我很乐意看到它。

我有几个朋友看过这个,他们每个人都给出了不同的答案。这些肯定比我做的要好。

(defn schedule-fns [fs secs]
  (let [ret (atom {})
        sink (a/chan)]
    (doseq [[n f] (map-indexed vector fs)]
      (a/thread (a/<!! (a/timeout (* 1000 n secs)))
                (let [val (f)
                      this-ret (swap! ret assoc n val)]
                  (when (= (count fs) (count this-ret))
                    (a/>!! sink (mapv (fn [i] (get this-ret i)) (range (count fs))))))))
    (a/<!! sink)))

(defn schedule-fns
  [fns sec]
  (let [concurrent (count fns)
        output-chan (a/chan)
        timedout-coll (map-indexed (fn [i f]
                                     #(do (println "Waiting")
                                          (a/<!! (a/timeout (* 1000 i sec)))
                                          (f))) fns)]
    (a/pipeline-blocking concurrent
                         output-chan
                         (map (fn [f] (f)))
                         (a/to-chan timedout-coll))
    (a/<!! (a/into [] output-chan))))

如果您的objective要绕过速率限制器,您可以考虑在异步通道中实现它。下面是一个示例实现 - 该函数采用一个通道,使用基于令牌的限制器限制其输入并将其通过管道传输到输出通道。

(require '[clojure.core.async :as async])

(defn rate-limiting-ch [input xf rate]
  (let [tokens (numerator rate)
        period (denominator rate)
        ans    (async/chan tokens xf)
        next   (fn [] (+ period (System/currentTimeMillis)))]
    (async/go-loop [c tokens
                    t (next)]
      (if (zero? c)
        (do
          (async/<! (async/timeout (- t (System/currentTimeMillis))))
          (recur tokens (next)))
        (when-let [x (async/<! input)]
          (async/>! ans x)
          (recur (dec c) t))))
    ans))

这是一个示例用法:

(let [start  (System/currentTimeMillis)
      input  (async/to-chan (range 10))
      output (rate-limiting-ch input
                               ;; simulate an api call with roundtrip time of ~300ms
                               (map #(let [wait (rand-int 300)
                                           ans  {:time  (- (System/currentTimeMillis) start)
                                                 :wait  wait
                                                 :input %}]
                                       (Thread/sleep wait)
                                       ans))
                               ;; rate limited to 2 calls per 1000ms
                               2/1000)]
  ;; consume the output
  (async/go-loop []
    (when-let [x (async/<! output)]
      (println x)
      (recur))))

输出:

{:time 4, :wait 63, :input 0}
{:time 68, :wait 160, :input 1}
{:time 1003, :wait 74, :input 2}
{:time 1079, :wait 151, :input 3}
{:time 2003, :wait 165, :input 4}
{:time 2169, :wait 182, :input 5}
{:time 3003, :wait 5, :input 6}
{:time 3009, :wait 18, :input 7}
{:time 4007, :wait 138, :input 8}
{:time 4149, :wait 229, :input 9}