在并发 http-kit/get 个实例中使用 i/o 回调的最简单方法

easiest way to use a i/o callback within concurrent http-kit/get instances

我正在启动数百个并发 http-kit.client/get 请求,并提供回调以将结果写入单个文件。

处理线程安全的好方法是什么?使用 chan<!! 来自 core.asyc?

这是我会考虑的代码:

(defn launch-async [channel url]                                                                                                                                
  (http/get url {:timeout 5000                                                                                                                                  
                 :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}                                              
          (fn [{:keys [status headers body error]}]                                                                                                             
            (if error                                                                                                                                           
              (put! channel (json/generate-string {:url url :headers headers :status status}))                                                                  
              (put! channel (json/generate-string body))))))                                                                                                    

(defn process-async [channel func]                                                                                                                              
  (when-let [response (<!! channel)]                                                                                                                            
    (func response)))                                                                                                                                           

(defn http-gets-async [func urls]                                                                                                                               
  (let [channel (chan)]                                                                                                                                         
    (doall (map #(launch-async channel %) urls))                                                                                                                
    (process-async channel func)))    

感谢您的见解。

这很简单,我不会使用 core.async。您可以使用一个原子存储来执行此操作,使用一个响应向量,然后让一个单独的线程读取原子的内容,直到它看到所有响应。然后,在您的 http-kit 回调中,您可以直接 swap! 将响应直接放入原子中。

如果您确实想使用 core.async,我建议您使用缓冲通道来避免阻塞您的 http-kit 线程池。

由于您已经在您的示例中使用了 core.async,我想我会指出一些问题以及您可以如何解决这些问题。另一个答案提到使用更基本的方法,我完全同意更简单的方法就好了。但是,使用通道,您可以使用一种简单的方式来使用数据,而无需在向量上进行映射,如果您有很多响应,向量也会随着时间的推移而变大。考虑以下问题以及我们如何解决它们:

(1) 如果您的 url 列表包含超过 1024 个元素,您当前的版本将会崩溃。有一个用于异步放置和获取的内部缓冲区(即,put!take! 不会阻塞,但总是立即 return),并且限制为 1024。这是为了防止无限异步使用通道。要亲自查看,请致电 (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com")).

你想做的是只在有空间的时候在频道上放一些东西。这称为背压。从 go block best practices 上的优秀 wiki 中获取一个页面,从你的 http-kit 回调中执行此操作的一种聪明方法是使用 put! 回调选项来启动你的下一个 http get;这只会在 put! 立即成功时发生,因此您永远不会遇到可以超出通道缓冲区的情况:

(defn launch-async
  [channel [url & urls]]
  (when url
    (http/get url {:timeout 5000
                   :user-agent "Mozilla"}
              (fn [{:keys [status headers body error]}]
                (let [put-on-chan (if error
                                    (json/generate-string {:url url :headers headers :status status})
                                    (json/generate-string body))]
                  (put! channel put-on-chan (fn [_] (launch-async channel urls))))))))

(2) 接下来,您似乎只处理一个响应。相反,使用 go-loop:

(defn process-async
  [channel func]
  (go-loop []
    (when-let [response (<! channel)]
      (func response)
      (recur))))

(3) 这是您的 http-gets-async 函数。我认为在此处添加缓冲区没有什么坏处,因为它应该可以帮助您在开始时触发大量请求:

(defn http-gets-async
  [func urls]
  (let [channel (chan 1000)]
    (launch-async channel urls)
    (process-async channel func)))

现在,您可以使用背压处理无限数量的 urls。要对此进行测试,请定义一个计数器,然后让您的处理函数递增该计数器以查看您的进度。使用易于攻击的本地主机 URL(不建议向 google 等发出数十万个请求):

(def responses (atom 0))
(http-gets-async (fn [_] (swap! responses inc))
                 (repeat 1000000 "http://localhost:8000"))

因为这都是异步的,您的函数将 return 立即并且您可以看到 @responses 增长。

您可以做的另一件有趣的事情是代替 运行 您在 process-async 中的处理函数,您可以选择将其用作通道本身的传感器。

(defn process-async
  [channel]
  (go-loop []
    (when-let [_ (<! channel)]
      (recur))))

(defn http-gets-async
  [func urls]
  (let [channel (chan 10000 (map func))] ;; <-- transducer on channel
    (launch-async channel urls)
    (process-async channel)))

有很多方法可以做到这一点,包括构造它以便通道关闭(请注意,在上面,它保持打开状态)。如果愿意,您可以使用 java.util.concurrent 原语来帮助解决这方面的问题,而且它们非常易于使用。可能性非常多。