Core.async: 从 promise-chans 集合中获取所有值
Core.async: Take all values from collection of promise-chans
考虑这样的数据集:
(def data [{:url "http://www.url1.com" :type :a}
{:url "http://www.url2.com" :type :a}
{:url "http://www.url3.com" :type :a}
{:url "http://www.url4.com" :type :b}])
那些 URL 的内容应该并行请求。根据项目的 :type 值,这些内容应该由相应的函数解析。解析函数 return 集合,一旦所有响应都到达,就应该将它们连接起来。
所以我们假设有函数 parse-a
和 parse-b
,当它们被传递包含 HTML
内容的字符串时,它们都是 return 字符串集合。
看起来 core.async
可能是一个很好的工具。可以为每个项目设置单独的通道,也可以使用一个通道。我不确定这里哪种方式更可取。对于多个通道,可以将传感器用于 postprocessing/parsing。还有一个特殊的 promise-chan
在这里可能是合适的。
这是一个代码草图,我使用的是基于回调的 HTTP kit
函数。不幸的是,我在 go 块中找不到通用解决方案。
(defn f [data]
(let [chans (map (fn [{:keys [url type]}]
(let [c (promise-chan (map ({:a parse-a :b parse-b} type)))]
(http/get url {} #(put! c %))
c))
data)
result-c (promise-chan)]
(go (put! result-c (concat (<! (nth chans 0))
(<! (nth chans 1))
(<! (nth chans 2))
(<! (nth chans 3)))))
result-c))
结果可以这样读:
(go (prn (<! (f data))))
我会说 promise-chan
在这里弊大于利。问题是大多数 core.async
API(a/merge
、a/reduce
等)依赖于通道将在某个时间点关闭的事实,promise-chan
s 依次关闭永不关闭。
因此,如果坚持使用 core.async
对您来说至关重要,那么更好的解决方案是不使用 promise-chan
,而是使用普通频道,它将在第一个 put!
之后关闭:
...
(let [c (chan 1 (map ({:a parse-a :b parse-b} type)))]
(http/get url {} #(do (put! c %) (close! c)))
c)
...
此时,您使用的是封闭通道,事情变得简单了一些。要收集所有值,您可以这样做:
;; (go (put! result-c (concat (<! (nth chans 0))
;; (<! (nth chans 1))
;; (<! (nth chans 2))
;; (<! (nth chans 3)))))
;; instead of above, now you can do this:
(->> chans
async/merge
(async/reduce into []))
UPD(以下为个人观点):
似乎,使用 core.async
通道作为承诺(以 promise-chan
的形式或在单个 put!
后关闭的通道)不是最好的方法。当事情发展时,结果 core.async
API 总体上(您可能已经注意到)并不像它应该的那样令人愉快。还有几个unsupported constructs, that may force you to write less idiomatic code than it could be. In addition, there is no built-in error handling (if error occurs within go
-block, go
-block will silently return nil
) and to address this you'll need to come up with something of your own (reinvent the wheel). Therefore, if you need promises, I'd recommend to use specific library for that, for example manifold
or promesa
.
使用 async.core
中的 pipeline-async 启动异步操作 类似于 http/get
同时以与输入相同的顺序传送结果:
(let [result (chan)]
(pipeline-async
20 result
(fn [{:keys [url type]} ch]
(let [parse ({:a parse-a :b parse-b} type)
callback #(put! ch (parse %)(partial close! ch))]
(http/get url {} callback)))
(to-chan data))
result)
我也想要这个功能,因为我真的很喜欢 core.async
但我也想在某些地方使用它,比如传统的 JavaScript
promises。我想出了一个使用宏的解决方案。在下面的代码中,<?
与 <!
相同,但如果出现错误,它会抛出。它的行为类似于 Promise.all()
,因为它 return 是来自渠道的所有 returned 值的 vector
,如果它们都成功的话;否则它将 return 第一个错误(因为 <?
将导致它抛出该值)。
(defmacro <<? [chans]
`(let [res# (atom [])]
(doseq [c# ~chans]
(swap! res# conj (serverless.core.async/<? c#)))
@res#))
如果您想查看函数的完整上下文,它位于 GitHub. It's heavily inspired from David Nolen's blog post。
如果有人还在看这个,请添加@OlegTheCat 的答案:
您可以使用单独的错误通道。
(:require [cljs.core.async :as async]
[cljs-http.client :as http])
(:require-macros [cljs.core.async.macros :refer [go]])
(go (as-> [(http/post <url1> <params1>)
(http/post <url2> <params2>)
...]
chans
(async/merge chans (count chans))
(async/reduce conj [] chans)
(async/<! chans)
(<callback> chans)))
考虑这样的数据集:
(def data [{:url "http://www.url1.com" :type :a}
{:url "http://www.url2.com" :type :a}
{:url "http://www.url3.com" :type :a}
{:url "http://www.url4.com" :type :b}])
那些 URL 的内容应该并行请求。根据项目的 :type 值,这些内容应该由相应的函数解析。解析函数 return 集合,一旦所有响应都到达,就应该将它们连接起来。
所以我们假设有函数 parse-a
和 parse-b
,当它们被传递包含 HTML
内容的字符串时,它们都是 return 字符串集合。
看起来 core.async
可能是一个很好的工具。可以为每个项目设置单独的通道,也可以使用一个通道。我不确定这里哪种方式更可取。对于多个通道,可以将传感器用于 postprocessing/parsing。还有一个特殊的 promise-chan
在这里可能是合适的。
这是一个代码草图,我使用的是基于回调的 HTTP kit
函数。不幸的是,我在 go 块中找不到通用解决方案。
(defn f [data]
(let [chans (map (fn [{:keys [url type]}]
(let [c (promise-chan (map ({:a parse-a :b parse-b} type)))]
(http/get url {} #(put! c %))
c))
data)
result-c (promise-chan)]
(go (put! result-c (concat (<! (nth chans 0))
(<! (nth chans 1))
(<! (nth chans 2))
(<! (nth chans 3)))))
result-c))
结果可以这样读:
(go (prn (<! (f data))))
我会说 promise-chan
在这里弊大于利。问题是大多数 core.async
API(a/merge
、a/reduce
等)依赖于通道将在某个时间点关闭的事实,promise-chan
s 依次关闭永不关闭。
因此,如果坚持使用 core.async
对您来说至关重要,那么更好的解决方案是不使用 promise-chan
,而是使用普通频道,它将在第一个 put!
之后关闭:
...
(let [c (chan 1 (map ({:a parse-a :b parse-b} type)))]
(http/get url {} #(do (put! c %) (close! c)))
c)
...
此时,您使用的是封闭通道,事情变得简单了一些。要收集所有值,您可以这样做:
;; (go (put! result-c (concat (<! (nth chans 0))
;; (<! (nth chans 1))
;; (<! (nth chans 2))
;; (<! (nth chans 3)))))
;; instead of above, now you can do this:
(->> chans
async/merge
(async/reduce into []))
UPD(以下为个人观点):
似乎,使用 core.async
通道作为承诺(以 promise-chan
的形式或在单个 put!
后关闭的通道)不是最好的方法。当事情发展时,结果 core.async
API 总体上(您可能已经注意到)并不像它应该的那样令人愉快。还有几个unsupported constructs, that may force you to write less idiomatic code than it could be. In addition, there is no built-in error handling (if error occurs within go
-block, go
-block will silently return nil
) and to address this you'll need to come up with something of your own (reinvent the wheel). Therefore, if you need promises, I'd recommend to use specific library for that, for example manifold
or promesa
.
使用 async.core
中的 pipeline-async 启动异步操作 类似于 http/get
同时以与输入相同的顺序传送结果:
(let [result (chan)]
(pipeline-async
20 result
(fn [{:keys [url type]} ch]
(let [parse ({:a parse-a :b parse-b} type)
callback #(put! ch (parse %)(partial close! ch))]
(http/get url {} callback)))
(to-chan data))
result)
我也想要这个功能,因为我真的很喜欢 core.async
但我也想在某些地方使用它,比如传统的 JavaScript
promises。我想出了一个使用宏的解决方案。在下面的代码中,<?
与 <!
相同,但如果出现错误,它会抛出。它的行为类似于 Promise.all()
,因为它 return 是来自渠道的所有 returned 值的 vector
,如果它们都成功的话;否则它将 return 第一个错误(因为 <?
将导致它抛出该值)。
(defmacro <<? [chans]
`(let [res# (atom [])]
(doseq [c# ~chans]
(swap! res# conj (serverless.core.async/<? c#)))
@res#))
如果您想查看函数的完整上下文,它位于 GitHub. It's heavily inspired from David Nolen's blog post。
如果有人还在看这个,请添加@OlegTheCat 的答案:
您可以使用单独的错误通道。
(:require [cljs.core.async :as async]
[cljs-http.client :as http])
(:require-macros [cljs.core.async.macros :refer [go]])
(go (as-> [(http/post <url1> <params1>)
(http/post <url2> <params2>)
...]
chans
(async/merge chans (count chans))
(async/reduce conj [] chans)
(async/<! chans)
(<callback> chans)))