如何使用 Clojure Core Async 安排相隔 `n` 秒的函数列表
How to schedule a list of functions `n` seconds apart with Clojure Core Async
在我的 Clojure 项目中,我试图列出对 API 的 http 调用列表,该列表具有限速器,每分钟仅允许 n
次调用。我希望在所有 http 调用完成后对每个响应进行 returned 以进行进一步处理。我是 Clojure 的 Core Async 的新手,但认为它很合适,但因为我需要 运行 每次调用 n
秒,所以我也在尝试使用 Chime 库。在 Chime 的库中,它有使用 Core Async 的示例,但这些示例在每个时间间隔都调用相同的函数,这不适用于此用例。
虽然可能有一种使用 chime-async
的方法可以更好地服务于这个用例,但我的所有尝试都失败了,所以我尝试简单地用核心异步包装 Chime 调用,但我可能Core Async 比 Chime 更令人困惑。
这是我名字的例子space。
(ns mp.util.schedule
(:require [chime.core :as chime]
[clojure.core.async :as a]
[tick.alpha.api :as tick]))
(defn schedule-fns
"Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
optionally provide an inst to start from"
[fs sec & [{:keys [inst] :or {inst (tick/now)}}]]
(let [ch (a/chan (count fs))
chime-times (map-indexed
(fn mapped-fn [i f]
(a/put! ch (chime/chime-at [(.plusSeconds inst (* i sec))]
(fn wrapped-fn [_] (f)))))
fs)]
(doseq [chi chime-times]
(a/<!! chi))))
; === Test Code ===
; simple test function
(defn sim-fn
"simple function that prints a message and value, then returns the value"
[v m]
(println m :at (tick/now))
v)
; list of test functions
(def fns [#(sim-fn 1 :one)
#(sim-fn 2 :two)
#(sim-fn 3 :three)])
调用 (schedule-fns fns 2)
时我想要发生的是每个函数在 fns
到 运行 n
秒之间以及 schedule-fns
到return (1 2 3)
(函数的 return 值),但这不是它正在做的。它在正确的时间调用每个函数(我可以从日志语句中看到)但它没有 returning 任何东西并且有一个我不明白的错误。我得到:
(schedule-fns fns 2)
:one :at #time/instant "2021-03-05T23:31:52.565Z"
Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval11496$fn$G (protocols.clj:15).
No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: java.lang.Boolean
:two :at #time/instant "2021-03-05T23:31:54.568Z"
:three :at #time/instant "2021-03-05T23:31:56.569Z"
如果我能得到帮助让我的代码正确使用 Core Async(有或没有 Chime),我将不胜感激。谢谢。
试试这个:
(defn sim-fn
"simple function that prints a message and value, then returns the value"
[v m]
(println m)
v)
; list of test functions
(def fns [#(sim-fn 1 :one)
#(sim-fn 2 :two)
#(sim-fn 3 :three)])
(defn schedule-fns [fns sec]
(let [program (interpose #(Thread/sleep (* sec 1000))
fns)]
(remove #(= % nil)
(for [p program]
(p)))))
然后调用:
> (schedule-fns fns 2)
:one
:two
:three
=> (1 2 3)
我想出了一个方法来获得我想要的...但有一些注意事项。
(def results (atom []))
(defn schedule-fns
"Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
optionally provide an inst to start from"
[fs sec]
(let [ch (chan (count fs))]
(go-loop []
(swap! results conj (<! ch))
(recur))
(map-indexed (fn [i f]
(println :waiting (* i sec) :seconds)
(go (<! (timeout (* i sec 1000)))
(>! ch (f))))
fs)))
此代码具有我想要的时间和行为,但我必须使用 atom
来存储响应。虽然我可以添加一个观察者来确定所有结果何时出现,但我仍然觉得我不应该这样做。
我想我现在会使用它,但在某些时候我会继续努力,如果有人有比这种方法更好的东西,我很乐意看到它。
我有几个朋友看过这个,他们每个人都给出了不同的答案。这些肯定比我做的要好。
(defn schedule-fns [fs secs]
(let [ret (atom {})
sink (a/chan)]
(doseq [[n f] (map-indexed vector fs)]
(a/thread (a/<!! (a/timeout (* 1000 n secs)))
(let [val (f)
this-ret (swap! ret assoc n val)]
(when (= (count fs) (count this-ret))
(a/>!! sink (mapv (fn [i] (get this-ret i)) (range (count fs))))))))
(a/<!! sink)))
和
(defn schedule-fns
[fns sec]
(let [concurrent (count fns)
output-chan (a/chan)
timedout-coll (map-indexed (fn [i f]
#(do (println "Waiting")
(a/<!! (a/timeout (* 1000 i sec)))
(f))) fns)]
(a/pipeline-blocking concurrent
output-chan
(map (fn [f] (f)))
(a/to-chan timedout-coll))
(a/<!! (a/into [] output-chan))))
如果您的objective要绕过速率限制器,您可以考虑在异步通道中实现它。下面是一个示例实现 - 该函数采用一个通道,使用基于令牌的限制器限制其输入并将其通过管道传输到输出通道。
(require '[clojure.core.async :as async])
(defn rate-limiting-ch [input xf rate]
(let [tokens (numerator rate)
period (denominator rate)
ans (async/chan tokens xf)
next (fn [] (+ period (System/currentTimeMillis)))]
(async/go-loop [c tokens
t (next)]
(if (zero? c)
(do
(async/<! (async/timeout (- t (System/currentTimeMillis))))
(recur tokens (next)))
(when-let [x (async/<! input)]
(async/>! ans x)
(recur (dec c) t))))
ans))
这是一个示例用法:
(let [start (System/currentTimeMillis)
input (async/to-chan (range 10))
output (rate-limiting-ch input
;; simulate an api call with roundtrip time of ~300ms
(map #(let [wait (rand-int 300)
ans {:time (- (System/currentTimeMillis) start)
:wait wait
:input %}]
(Thread/sleep wait)
ans))
;; rate limited to 2 calls per 1000ms
2/1000)]
;; consume the output
(async/go-loop []
(when-let [x (async/<! output)]
(println x)
(recur))))
输出:
{:time 4, :wait 63, :input 0}
{:time 68, :wait 160, :input 1}
{:time 1003, :wait 74, :input 2}
{:time 1079, :wait 151, :input 3}
{:time 2003, :wait 165, :input 4}
{:time 2169, :wait 182, :input 5}
{:time 3003, :wait 5, :input 6}
{:time 3009, :wait 18, :input 7}
{:time 4007, :wait 138, :input 8}
{:time 4149, :wait 229, :input 9}
在我的 Clojure 项目中,我试图列出对 API 的 http 调用列表,该列表具有限速器,每分钟仅允许 n
次调用。我希望在所有 http 调用完成后对每个响应进行 returned 以进行进一步处理。我是 Clojure 的 Core Async 的新手,但认为它很合适,但因为我需要 运行 每次调用 n
秒,所以我也在尝试使用 Chime 库。在 Chime 的库中,它有使用 Core Async 的示例,但这些示例在每个时间间隔都调用相同的函数,这不适用于此用例。
虽然可能有一种使用 chime-async
的方法可以更好地服务于这个用例,但我的所有尝试都失败了,所以我尝试简单地用核心异步包装 Chime 调用,但我可能Core Async 比 Chime 更令人困惑。
这是我名字的例子space。
(ns mp.util.schedule
(:require [chime.core :as chime]
[clojure.core.async :as a]
[tick.alpha.api :as tick]))
(defn schedule-fns
"Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
optionally provide an inst to start from"
[fs sec & [{:keys [inst] :or {inst (tick/now)}}]]
(let [ch (a/chan (count fs))
chime-times (map-indexed
(fn mapped-fn [i f]
(a/put! ch (chime/chime-at [(.plusSeconds inst (* i sec))]
(fn wrapped-fn [_] (f)))))
fs)]
(doseq [chi chime-times]
(a/<!! chi))))
; === Test Code ===
; simple test function
(defn sim-fn
"simple function that prints a message and value, then returns the value"
[v m]
(println m :at (tick/now))
v)
; list of test functions
(def fns [#(sim-fn 1 :one)
#(sim-fn 2 :two)
#(sim-fn 3 :three)])
调用 (schedule-fns fns 2)
时我想要发生的是每个函数在 fns
到 运行 n
秒之间以及 schedule-fns
到return (1 2 3)
(函数的 return 值),但这不是它正在做的。它在正确的时间调用每个函数(我可以从日志语句中看到)但它没有 returning 任何东西并且有一个我不明白的错误。我得到:
(schedule-fns fns 2)
:one :at #time/instant "2021-03-05T23:31:52.565Z"
Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval11496$fn$G (protocols.clj:15).
No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: java.lang.Boolean
:two :at #time/instant "2021-03-05T23:31:54.568Z"
:three :at #time/instant "2021-03-05T23:31:56.569Z"
如果我能得到帮助让我的代码正确使用 Core Async(有或没有 Chime),我将不胜感激。谢谢。
试试这个:
(defn sim-fn
"simple function that prints a message and value, then returns the value"
[v m]
(println m)
v)
; list of test functions
(def fns [#(sim-fn 1 :one)
#(sim-fn 2 :two)
#(sim-fn 3 :three)])
(defn schedule-fns [fns sec]
(let [program (interpose #(Thread/sleep (* sec 1000))
fns)]
(remove #(= % nil)
(for [p program]
(p)))))
然后调用:
> (schedule-fns fns 2)
:one
:two
:three
=> (1 2 3)
我想出了一个方法来获得我想要的...但有一些注意事项。
(def results (atom []))
(defn schedule-fns
"Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
optionally provide an inst to start from"
[fs sec]
(let [ch (chan (count fs))]
(go-loop []
(swap! results conj (<! ch))
(recur))
(map-indexed (fn [i f]
(println :waiting (* i sec) :seconds)
(go (<! (timeout (* i sec 1000)))
(>! ch (f))))
fs)))
此代码具有我想要的时间和行为,但我必须使用 atom
来存储响应。虽然我可以添加一个观察者来确定所有结果何时出现,但我仍然觉得我不应该这样做。
我想我现在会使用它,但在某些时候我会继续努力,如果有人有比这种方法更好的东西,我很乐意看到它。
我有几个朋友看过这个,他们每个人都给出了不同的答案。这些肯定比我做的要好。
(defn schedule-fns [fs secs]
(let [ret (atom {})
sink (a/chan)]
(doseq [[n f] (map-indexed vector fs)]
(a/thread (a/<!! (a/timeout (* 1000 n secs)))
(let [val (f)
this-ret (swap! ret assoc n val)]
(when (= (count fs) (count this-ret))
(a/>!! sink (mapv (fn [i] (get this-ret i)) (range (count fs))))))))
(a/<!! sink)))
和
(defn schedule-fns
[fns sec]
(let [concurrent (count fns)
output-chan (a/chan)
timedout-coll (map-indexed (fn [i f]
#(do (println "Waiting")
(a/<!! (a/timeout (* 1000 i sec)))
(f))) fns)]
(a/pipeline-blocking concurrent
output-chan
(map (fn [f] (f)))
(a/to-chan timedout-coll))
(a/<!! (a/into [] output-chan))))
如果您的objective要绕过速率限制器,您可以考虑在异步通道中实现它。下面是一个示例实现 - 该函数采用一个通道,使用基于令牌的限制器限制其输入并将其通过管道传输到输出通道。
(require '[clojure.core.async :as async])
(defn rate-limiting-ch [input xf rate]
(let [tokens (numerator rate)
period (denominator rate)
ans (async/chan tokens xf)
next (fn [] (+ period (System/currentTimeMillis)))]
(async/go-loop [c tokens
t (next)]
(if (zero? c)
(do
(async/<! (async/timeout (- t (System/currentTimeMillis))))
(recur tokens (next)))
(when-let [x (async/<! input)]
(async/>! ans x)
(recur (dec c) t))))
ans))
这是一个示例用法:
(let [start (System/currentTimeMillis)
input (async/to-chan (range 10))
output (rate-limiting-ch input
;; simulate an api call with roundtrip time of ~300ms
(map #(let [wait (rand-int 300)
ans {:time (- (System/currentTimeMillis) start)
:wait wait
:input %}]
(Thread/sleep wait)
ans))
;; rate limited to 2 calls per 1000ms
2/1000)]
;; consume the output
(async/go-loop []
(when-let [x (async/<! output)]
(println x)
(recur))))
输出:
{:time 4, :wait 63, :input 0}
{:time 68, :wait 160, :input 1}
{:time 1003, :wait 74, :input 2}
{:time 1079, :wait 151, :input 3}
{:time 2003, :wait 165, :input 4}
{:time 2169, :wait 182, :input 5}
{:time 3003, :wait 5, :input 6}
{:time 3009, :wait 18, :input 7}
{:time 4007, :wait 138, :input 8}
{:time 4149, :wait 229, :input 9}