Clojurescript:使用 core/async 个通道分块处理请求
Clojurescript: process requests in chunks with core/async channels
我有以下场景:
我使用一些服务来检索一些数据并将其传递给我的输入。
有了一些输入参数,我需要针对上述服务执行 N 个请求,收集输出并为每个输出执行一些 CPU 繁重的任务。
我正在尝试使用 core/async 个渠道来实现这一目标。
这是我的尝试(示意性地),它有点管用,但它并不像我希望的那样。
对于如何改进它的任何提示,我们将不胜感激。
(defn produce-inputs
[in-chan inputs]
(let input-names-seq (map #(:name %) inputs)]
(doseq [input-name input-names-seq]
(async/go
(async/>! in-chan input-name)))))
(defn consume
[inputs]
(let [in-chan (async/chan 1)
out-chan (async/chan 1)]
(do
(produce-inputs in-chan inputs)
(async/go-loop []
(let [input-name (async/<! in-chan)]
(do
(retrieve-resource-from-service input-name
; response handler
(fn [resp]
(async/go
(let [result (:result resp)]
(async/>! out-chan result)))))
(when input-name
(recur)))))
; read from out-chan and do some heavy work for each entry
(async/go-loop []
(let [result (async/<! out-chan)]
(do-some-cpu-heavy-work result))))))
; entry point
(defn run
[inputs]
(consume inputs))
有没有什么方法可以更新它,以便每时每刻都不会有超过五个服务请求 (retrieve-resource-from-service
) 处于活动状态?
如果我的解释不清楚,请提问,我会更新。
您可以创建另一个通道作为令牌桶来限制您的请求速率。
See this link 使用令牌桶进行每秒速率限制的示例。
要限制同时请求的数量,您可以按照以下方式做一些事情:
(defn consume [inputs]
(let [in-chan (async/chan 1)
out-chan (async/chan 1)
bucket (async/chan 5)]
;; ...
(dotimes [_ 5] (async/put! bucket :token))
(async/go-loop []
(let [input-name (async/<! in-chan)
token (async/<! bucket)]
(retrieve-resource-from-service
input-name
; response handler
(fn [resp]
(async/go
(let [result (:result resp)]
(async/>! out-chan result)
(async/>! bucket token)))))
(when input-name
(recur))))
;; ...
))
创建了一个新频道bucket
,并向其中放入了五个项目。在发出请求之前,我们从桶中取出一个令牌,并在请求完成后将其放回原处。如果 bucket
通道中没有令牌,我们必须等到其中一个请求完成。
注意:这只是代码的草图,您可能需要更正它。特别是,如果您的 retrieve-resource-from-service
函数中有任何错误处理程序,您也应该在出现错误时放回令牌,以避免最终出现死锁。
我有以下场景:
我使用一些服务来检索一些数据并将其传递给我的输入。
有了一些输入参数,我需要针对上述服务执行 N 个请求,收集输出并为每个输出执行一些 CPU 繁重的任务。
我正在尝试使用 core/async 个渠道来实现这一目标。
这是我的尝试(示意性地),它有点管用,但它并不像我希望的那样。 对于如何改进它的任何提示,我们将不胜感激。
(defn produce-inputs
[in-chan inputs]
(let input-names-seq (map #(:name %) inputs)]
(doseq [input-name input-names-seq]
(async/go
(async/>! in-chan input-name)))))
(defn consume
[inputs]
(let [in-chan (async/chan 1)
out-chan (async/chan 1)]
(do
(produce-inputs in-chan inputs)
(async/go-loop []
(let [input-name (async/<! in-chan)]
(do
(retrieve-resource-from-service input-name
; response handler
(fn [resp]
(async/go
(let [result (:result resp)]
(async/>! out-chan result)))))
(when input-name
(recur)))))
; read from out-chan and do some heavy work for each entry
(async/go-loop []
(let [result (async/<! out-chan)]
(do-some-cpu-heavy-work result))))))
; entry point
(defn run
[inputs]
(consume inputs))
有没有什么方法可以更新它,以便每时每刻都不会有超过五个服务请求 (retrieve-resource-from-service
) 处于活动状态?
如果我的解释不清楚,请提问,我会更新。
您可以创建另一个通道作为令牌桶来限制您的请求速率。
See this link 使用令牌桶进行每秒速率限制的示例。
要限制同时请求的数量,您可以按照以下方式做一些事情:
(defn consume [inputs]
(let [in-chan (async/chan 1)
out-chan (async/chan 1)
bucket (async/chan 5)]
;; ...
(dotimes [_ 5] (async/put! bucket :token))
(async/go-loop []
(let [input-name (async/<! in-chan)
token (async/<! bucket)]
(retrieve-resource-from-service
input-name
; response handler
(fn [resp]
(async/go
(let [result (:result resp)]
(async/>! out-chan result)
(async/>! bucket token)))))
(when input-name
(recur))))
;; ...
))
创建了一个新频道bucket
,并向其中放入了五个项目。在发出请求之前,我们从桶中取出一个令牌,并在请求完成后将其放回原处。如果 bucket
通道中没有令牌,我们必须等到其中一个请求完成。
注意:这只是代码的草图,您可能需要更正它。特别是,如果您的 retrieve-resource-from-service
函数中有任何错误处理程序,您也应该在出现错误时放回令牌,以避免最终出现死锁。