Clojure,如何从到达 core.async 聚合器的元素创建向量

Clojure, how to create a vector from elements arriving in a core.async aggregator

我已关注 here 并且正在将称为分段的内容发送到聚合器。这些片段都到了,我可以在它们到达时打印出来。但我想做的是在它们到达时从它们中创建一个不可变的数据结构(一个向量)。或者甚至等待他们全部到达,然后制作矢量。我能够知道最后一个到达的时间并对其进行分类。我需要 conj 到达的段到现有的到目前为止构建的向量。我习惯于从函数调用中使用 returns 创建这样的向量,但我看不到如何在线程或 go 块中使用此功能。

假设,由于您能够接收多个项目,您的异步代码已经在循环中。

为了从您获得的项目构建向量,您应该使用循环绑定。

(def acc-chan
  (>/go-loop [accumulator []]
      (let [item (>/<! source-chan)]
        (if (nil? item)
          accumulator
          (recur (conj accumulator item)))))

go-loop调用会立即return一个acc-chan,它会在循环退出时接收循环的return值(累加器)。累加器在循环的每次迭代中重新绑定,将另一个项目添加到末尾。当源关闭时,累加器从循环中 returned,并放置到 acc-chan 上,您可以在其中读取它并使用该值。

另一种选择是使用 (clojure.core.async/into [] source-chan) 。与异步工具包中的其他 "reducer-chans" 一样,它也是基于这样的假设:当您准备好接收结果时,您将 close! 源通道。