确保在完成所有请求后关闭 clj-http 的连接管理器的正确方法
Proper way to ensure clj-http's connection manager is closed after all requests are done
我的代码是 clj-http
、core.async
设施和 atom
的组合。它创建了一些线程来获取和解析一堆页面:
(defn fetch-page
([url] (fetch-page url nil))
([url conn-manager]
(-> (http.client/get url {:connection-manager conn-manager})
:body hickory/parse hickory/as-hickory)))
(defn- create-worker
[url-chan result conn-manager]
(async/thread
(loop [url (async/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (async/<!! url-chan))))))
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/alts!! workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
想法是使用多线程来减少获取和解析页面的时间,但我想不要使服务器过载,发送大量请求一次 - 这就是使用连接管理器的原因。不知道我的做法对不对,欢迎指教。目前的问题是最后的请求失败,因为连接管理器在它们终止之前关闭:Exception in thread "async-thread-macro-15" java.lang.IllegalStateException: Connection pool shut down
.
主要问题:如何在适当的时候关闭连接管理器(以及为什么我当前的代码无法执行此操作)?支线任务:我的方法对吗?如果没有,我该怎么做才能一次获取和解析多个页面,同时又不会使服务器过载?
谢谢!
问题是 async/alts!!
returns 在第一个结果上(并且会继续这样做,因为 workers
永远不会改变)。我认为使用 async/merge
建立一个频道然后反复读取它应该可行。
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))
all-workers (async/merge workers)]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/<!! all-workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
或者,您可以重复并继续收缩 workers
,这样您就只在等待以前未完成的工人。
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(loop [workers workers]
(when (seq workers)
(let [[_ finished-worker] (async/alts!! workers)]
(recur (filterv #(not= finished-worker %) workers)))))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
我相信亚历杭德罗对你的错误原因是正确的,这是合乎逻辑的,因为你的错误表明你在所有请求完成之前关闭了连接管理器,所以很可能所有的工作人员都没有当你关闭它时完成。
我将提出的另一个解决方案源于这样一个事实,即您实际上并没有在 create-worker
线程中执行任何需要它成为通道的操作,而通道是由 async/thread
隐式创建的。因此,您可以将其替换为 future
,如下所示:
(defn- create-worker
[url-chan result conn-manager]
(future
(loop [url (a/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (a/<!! url-chan))))))
并且在您的 fetch-pages
函数中,"join" 通过取消引用:
(doseq [worker workers]
@worker) ; alternatively, use deref to specify timeout
这消除了很多 core.async 干扰,这些干扰本来就不是 core.async 问题。这当然取决于您是否保持原样收集数据的方法,即在原子上使用 swap!
来跟踪页面数据。如果您要将 fetch-page
的结果发送到 return 频道或类似的东西,那么您需要保持当前的 thread
方法。
关于您对服务器过载的担忧 -- 您尚未定义它对 "overload" 服务器意味着什么。这个有两个维度:一个是请求的rate(比如每秒的请求数),另一个是concurrent[=34]的数量=] 请求。您当前的应用程序有 n
个工作线程,这是有效的并发性(连同连接管理器中的设置)。但这对解决每秒请求率没有任何作用。
这比看起来要复杂一点,但这是可能的。您必须考虑每单位时间所有线程完成的所有请求的总数,并且管理这不是一个答案可以解决的问题。我建议你做一些关于节流和速率限制的研究,试一试,然后带着问题从那里开始。
我的代码是 clj-http
、core.async
设施和 atom
的组合。它创建了一些线程来获取和解析一堆页面:
(defn fetch-page
([url] (fetch-page url nil))
([url conn-manager]
(-> (http.client/get url {:connection-manager conn-manager})
:body hickory/parse hickory/as-hickory)))
(defn- create-worker
[url-chan result conn-manager]
(async/thread
(loop [url (async/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (async/<!! url-chan))))))
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/alts!! workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
想法是使用多线程来减少获取和解析页面的时间,但我想不要使服务器过载,发送大量请求一次 - 这就是使用连接管理器的原因。不知道我的做法对不对,欢迎指教。目前的问题是最后的请求失败,因为连接管理器在它们终止之前关闭:Exception in thread "async-thread-macro-15" java.lang.IllegalStateException: Connection pool shut down
.
主要问题:如何在适当的时候关闭连接管理器(以及为什么我当前的代码无法执行此操作)?支线任务:我的方法对吗?如果没有,我该怎么做才能一次获取和解析多个页面,同时又不会使服务器过载?
谢谢!
问题是 async/alts!!
returns 在第一个结果上(并且会继续这样做,因为 workers
永远不会改变)。我认为使用 async/merge
建立一个频道然后反复读取它应该可行。
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))
all-workers (async/merge workers)]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/<!! all-workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
或者,您可以重复并继续收缩 workers
,这样您就只在等待以前未完成的工人。
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(loop [workers workers]
(when (seq workers)
(let [[_ finished-worker] (async/alts!! workers)]
(recur (filterv #(not= finished-worker %) workers)))))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
我相信亚历杭德罗对你的错误原因是正确的,这是合乎逻辑的,因为你的错误表明你在所有请求完成之前关闭了连接管理器,所以很可能所有的工作人员都没有当你关闭它时完成。
我将提出的另一个解决方案源于这样一个事实,即您实际上并没有在 create-worker
线程中执行任何需要它成为通道的操作,而通道是由 async/thread
隐式创建的。因此,您可以将其替换为 future
,如下所示:
(defn- create-worker
[url-chan result conn-manager]
(future
(loop [url (a/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (a/<!! url-chan))))))
并且在您的 fetch-pages
函数中,"join" 通过取消引用:
(doseq [worker workers]
@worker) ; alternatively, use deref to specify timeout
这消除了很多 core.async 干扰,这些干扰本来就不是 core.async 问题。这当然取决于您是否保持原样收集数据的方法,即在原子上使用 swap!
来跟踪页面数据。如果您要将 fetch-page
的结果发送到 return 频道或类似的东西,那么您需要保持当前的 thread
方法。
关于您对服务器过载的担忧 -- 您尚未定义它对 "overload" 服务器意味着什么。这个有两个维度:一个是请求的rate(比如每秒的请求数),另一个是concurrent[=34]的数量=] 请求。您当前的应用程序有 n
个工作线程,这是有效的并发性(连同连接管理器中的设置)。但这对解决每秒请求率没有任何作用。
这比看起来要复杂一点,但这是可能的。您必须考虑每单位时间所有线程完成的所有请求的总数,并且管理这不是一个答案可以解决的问题。我建议你做一些关于节流和速率限制的研究,试一试,然后带着问题从那里开始。