Clojure async/go 如何停放阻塞代码
Clojure async/go how to park blocking code
我使用一些 Java 库来发出非异步获取和 post 请求。我曾经将此类请求包装到期货中,它为我解决了 "waiting problem"(我的意思是等待响应)
(defn unchangeable-lib-request [n]
(Thread/sleep 1000)
n)
(defn process [n]
(let [res (atom [])]
(dotimes [i n]
(future (swap! res conj (unchangeable-lib-request i))))
(loop []
(if (> n (count @res))
(recur)
@res))))
(time (process 9))
;; "Elapsed time: 1000.639079 msecs"
;; => [8 7 5 6 4 3 2 1 0]
但我需要创建数百个请求,这会产生性能问题。我发现了 core.async 并开始阻止。但是如果我在这个库中使用 go-blocks,它不会解决 "waiting problem"
(defn unchangeable-lib-request [n]
(Thread/sleep 1000)
n)
(defn process [n]
(let [c (async/chan 10)]
(dotimes [i n]
(async/go
(async/>! c (unchangeable-lib-request i))))
(loop [result []]
(if (> n (count result))
(recur (conj result (async/<!! c)))
result))))
(time (process 9))
;; "Elapsed time: 2001.770183 msecs"
;; => [0 4 1 6 7 2 5 3 8]
Go 块只能同时处理 8 个请求。是否有可能编写一些异步包装器来停放 go-block 并提供异步发出 100 多个请求而不会相互阻塞的能力?
(defn process [n]
(let [c (async/chan 10)]
(dotimes [i n]
(async/go
(async/>! c (magic-async-parking-wrapper
(unchangeable-lib-request i))))
(loop [result []]
(if (> n (count result))
(recur (conj result (async/<!! c)))
result))))
(time (process 9))
;; "Elapsed time: 1003.2563 msecs"
我知道 async/thread 但似乎这与 (future ...) 相同。
可能吗?
我建议:
- 使用 futures 创建线程,并让它们使用
put!
从任何 go 块外部将结果放回核心异步通道,例如:(future (put! chan (worker-function)))
- 然后使用 go 块在该(单个)通道上等待,在获得结果时输入结果。
这是你使用的地方clojure.core.async/pipeline-blocking
(require '[clojure.core.async :as a :refer [chan pipeline-blocking]])
(let [output-chan (chan 100)
input-chan (chan 1000)]
(pipeline-blocking 4 ; parallelism knob
output-chan
(map unchangeable-lib-request)
input-chan)
;; Consume results from output-chan, put operations on input-chan
[output-chan input-chan]
)
这会生成 n 个(在本例中为 4 个)线程,这些线程一直忙于执行 unchangeable-lib-request
。
使用 output-chan
的缓冲区大小来微调您希望提前发生的请求数量。
使用 input-chan
的缓冲区大小来微调您希望在没有反向传播的情况下安排的请求数量(阻塞 input-chan
)。
我使用一些 Java 库来发出非异步获取和 post 请求。我曾经将此类请求包装到期货中,它为我解决了 "waiting problem"(我的意思是等待响应)
(defn unchangeable-lib-request [n]
(Thread/sleep 1000)
n)
(defn process [n]
(let [res (atom [])]
(dotimes [i n]
(future (swap! res conj (unchangeable-lib-request i))))
(loop []
(if (> n (count @res))
(recur)
@res))))
(time (process 9))
;; "Elapsed time: 1000.639079 msecs"
;; => [8 7 5 6 4 3 2 1 0]
但我需要创建数百个请求,这会产生性能问题。我发现了 core.async 并开始阻止。但是如果我在这个库中使用 go-blocks,它不会解决 "waiting problem"
(defn unchangeable-lib-request [n]
(Thread/sleep 1000)
n)
(defn process [n]
(let [c (async/chan 10)]
(dotimes [i n]
(async/go
(async/>! c (unchangeable-lib-request i))))
(loop [result []]
(if (> n (count result))
(recur (conj result (async/<!! c)))
result))))
(time (process 9))
;; "Elapsed time: 2001.770183 msecs"
;; => [0 4 1 6 7 2 5 3 8]
Go 块只能同时处理 8 个请求。是否有可能编写一些异步包装器来停放 go-block 并提供异步发出 100 多个请求而不会相互阻塞的能力?
(defn process [n]
(let [c (async/chan 10)]
(dotimes [i n]
(async/go
(async/>! c (magic-async-parking-wrapper
(unchangeable-lib-request i))))
(loop [result []]
(if (> n (count result))
(recur (conj result (async/<!! c)))
result))))
(time (process 9))
;; "Elapsed time: 1003.2563 msecs"
我知道 async/thread 但似乎这与 (future ...) 相同。
可能吗?
我建议:
- 使用 futures 创建线程,并让它们使用
put!
从任何 go 块外部将结果放回核心异步通道,例如:(future (put! chan (worker-function)))
- 然后使用 go 块在该(单个)通道上等待,在获得结果时输入结果。
这是你使用的地方clojure.core.async/pipeline-blocking
(require '[clojure.core.async :as a :refer [chan pipeline-blocking]])
(let [output-chan (chan 100)
input-chan (chan 1000)]
(pipeline-blocking 4 ; parallelism knob
output-chan
(map unchangeable-lib-request)
input-chan)
;; Consume results from output-chan, put operations on input-chan
[output-chan input-chan]
)
这会生成 n 个(在本例中为 4 个)线程,这些线程一直忙于执行 unchangeable-lib-request
。
使用 output-chan
的缓冲区大小来微调您希望提前发生的请求数量。
使用 input-chan
的缓冲区大小来微调您希望在没有反向传播的情况下安排的请求数量(阻塞 input-chan
)。