在并发 http-kit/get 个实例中使用 i/o 回调的最简单方法
easiest way to use a i/o callback within concurrent http-kit/get instances
我正在启动数百个并发 http-kit.client/get
请求,并提供回调以将结果写入单个文件。
处理线程安全的好方法是什么?使用 chan
和 <!!
来自 core.asyc
?
这是我会考虑的代码:
(defn launch-async [channel url]
(http/get url {:timeout 5000
:user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}
(fn [{:keys [status headers body error]}]
(if error
(put! channel (json/generate-string {:url url :headers headers :status status}))
(put! channel (json/generate-string body))))))
(defn process-async [channel func]
(when-let [response (<!! channel)]
(func response)))
(defn http-gets-async [func urls]
(let [channel (chan)]
(doall (map #(launch-async channel %) urls))
(process-async channel func)))
感谢您的见解。
这很简单,我不会使用 core.async。您可以使用一个原子存储来执行此操作,使用一个响应向量,然后让一个单独的线程读取原子的内容,直到它看到所有响应。然后,在您的 http-kit 回调中,您可以直接 swap!
将响应直接放入原子中。
如果您确实想使用 core.async,我建议您使用缓冲通道来避免阻塞您的 http-kit 线程池。
由于您已经在您的示例中使用了 core.async,我想我会指出一些问题以及您可以如何解决这些问题。另一个答案提到使用更基本的方法,我完全同意更简单的方法就好了。但是,使用通道,您可以使用一种简单的方式来使用数据,而无需在向量上进行映射,如果您有很多响应,向量也会随着时间的推移而变大。考虑以下问题以及我们如何解决它们:
(1) 如果您的 url 列表包含超过 1024 个元素,您当前的版本将会崩溃。有一个用于异步放置和获取的内部缓冲区(即,put!
和 take!
不会阻塞,但总是立即 return),并且限制为 1024。这是为了防止无限异步使用通道。要亲自查看,请致电 (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com"))
.
你想做的是只在有空间的时候在频道上放一些东西。这称为背压。从 go block best practices 上的优秀 wiki 中获取一个页面,从你的 http-kit 回调中执行此操作的一种聪明方法是使用 put!
回调选项来启动你的下一个 http get;这只会在 put!
立即成功时发生,因此您永远不会遇到可以超出通道缓冲区的情况:
(defn launch-async
[channel [url & urls]]
(when url
(http/get url {:timeout 5000
:user-agent "Mozilla"}
(fn [{:keys [status headers body error]}]
(let [put-on-chan (if error
(json/generate-string {:url url :headers headers :status status})
(json/generate-string body))]
(put! channel put-on-chan (fn [_] (launch-async channel urls))))))))
(2) 接下来,您似乎只处理一个响应。相反,使用 go-loop:
(defn process-async
[channel func]
(go-loop []
(when-let [response (<! channel)]
(func response)
(recur))))
(3) 这是您的 http-gets-async
函数。我认为在此处添加缓冲区没有什么坏处,因为它应该可以帮助您在开始时触发大量请求:
(defn http-gets-async
[func urls]
(let [channel (chan 1000)]
(launch-async channel urls)
(process-async channel func)))
现在,您可以使用背压处理无限数量的 urls。要对此进行测试,请定义一个计数器,然后让您的处理函数递增该计数器以查看您的进度。使用易于攻击的本地主机 URL(不建议向 google 等发出数十万个请求):
(def responses (atom 0))
(http-gets-async (fn [_] (swap! responses inc))
(repeat 1000000 "http://localhost:8000"))
因为这都是异步的,您的函数将 return 立即并且您可以看到 @responses
增长。
您可以做的另一件有趣的事情是代替 运行 您在 process-async
中的处理函数,您可以选择将其用作通道本身的传感器。
(defn process-async
[channel]
(go-loop []
(when-let [_ (<! channel)]
(recur))))
(defn http-gets-async
[func urls]
(let [channel (chan 10000 (map func))] ;; <-- transducer on channel
(launch-async channel urls)
(process-async channel)))
有很多方法可以做到这一点,包括构造它以便通道关闭(请注意,在上面,它保持打开状态)。如果愿意,您可以使用 java.util.concurrent
原语来帮助解决这方面的问题,而且它们非常易于使用。可能性非常多。
我正在启动数百个并发 http-kit.client/get
请求,并提供回调以将结果写入单个文件。
处理线程安全的好方法是什么?使用 chan
和 <!!
来自 core.asyc
?
这是我会考虑的代码:
(defn launch-async [channel url]
(http/get url {:timeout 5000
:user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}
(fn [{:keys [status headers body error]}]
(if error
(put! channel (json/generate-string {:url url :headers headers :status status}))
(put! channel (json/generate-string body))))))
(defn process-async [channel func]
(when-let [response (<!! channel)]
(func response)))
(defn http-gets-async [func urls]
(let [channel (chan)]
(doall (map #(launch-async channel %) urls))
(process-async channel func)))
感谢您的见解。
这很简单,我不会使用 core.async。您可以使用一个原子存储来执行此操作,使用一个响应向量,然后让一个单独的线程读取原子的内容,直到它看到所有响应。然后,在您的 http-kit 回调中,您可以直接 swap!
将响应直接放入原子中。
如果您确实想使用 core.async,我建议您使用缓冲通道来避免阻塞您的 http-kit 线程池。
由于您已经在您的示例中使用了 core.async,我想我会指出一些问题以及您可以如何解决这些问题。另一个答案提到使用更基本的方法,我完全同意更简单的方法就好了。但是,使用通道,您可以使用一种简单的方式来使用数据,而无需在向量上进行映射,如果您有很多响应,向量也会随着时间的推移而变大。考虑以下问题以及我们如何解决它们:
(1) 如果您的 url 列表包含超过 1024 个元素,您当前的版本将会崩溃。有一个用于异步放置和获取的内部缓冲区(即,put!
和 take!
不会阻塞,但总是立即 return),并且限制为 1024。这是为了防止无限异步使用通道。要亲自查看,请致电 (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com"))
.
你想做的是只在有空间的时候在频道上放一些东西。这称为背压。从 go block best practices 上的优秀 wiki 中获取一个页面,从你的 http-kit 回调中执行此操作的一种聪明方法是使用 put!
回调选项来启动你的下一个 http get;这只会在 put!
立即成功时发生,因此您永远不会遇到可以超出通道缓冲区的情况:
(defn launch-async
[channel [url & urls]]
(when url
(http/get url {:timeout 5000
:user-agent "Mozilla"}
(fn [{:keys [status headers body error]}]
(let [put-on-chan (if error
(json/generate-string {:url url :headers headers :status status})
(json/generate-string body))]
(put! channel put-on-chan (fn [_] (launch-async channel urls))))))))
(2) 接下来,您似乎只处理一个响应。相反,使用 go-loop:
(defn process-async
[channel func]
(go-loop []
(when-let [response (<! channel)]
(func response)
(recur))))
(3) 这是您的 http-gets-async
函数。我认为在此处添加缓冲区没有什么坏处,因为它应该可以帮助您在开始时触发大量请求:
(defn http-gets-async
[func urls]
(let [channel (chan 1000)]
(launch-async channel urls)
(process-async channel func)))
现在,您可以使用背压处理无限数量的 urls。要对此进行测试,请定义一个计数器,然后让您的处理函数递增该计数器以查看您的进度。使用易于攻击的本地主机 URL(不建议向 google 等发出数十万个请求):
(def responses (atom 0))
(http-gets-async (fn [_] (swap! responses inc))
(repeat 1000000 "http://localhost:8000"))
因为这都是异步的,您的函数将 return 立即并且您可以看到 @responses
增长。
您可以做的另一件有趣的事情是代替 运行 您在 process-async
中的处理函数,您可以选择将其用作通道本身的传感器。
(defn process-async
[channel]
(go-loop []
(when-let [_ (<! channel)]
(recur))))
(defn http-gets-async
[func urls]
(let [channel (chan 10000 (map func))] ;; <-- transducer on channel
(launch-async channel urls)
(process-async channel)))
有很多方法可以做到这一点,包括构造它以便通道关闭(请注意,在上面,它保持打开状态)。如果愿意,您可以使用 java.util.concurrent
原语来帮助解决这方面的问题,而且它们非常易于使用。可能性非常多。