Clojure core.async 用于数据计算

Clojure core.async for data computation

我已经开始使用 clojure core.async 库。我发现 CSP、通道、go blocks 的概念非常容易使用。但是,我不确定我是否正确使用它们。我有以下代码 -

(def x-ch (chan))
(def y-ch (chan))
(def w1-ch (chan))
(def w2-ch (chan))

; they all return matrices
(go (>! x-ch (Mat/* x (map #(/ 1.0 %) (max-fold x)))))
(go (>! y-ch (Mat/* y (map #(/ 1.0 %) (max-fold y)))))
(go (>! w1-ch (gen-matrix 200 300)))
(go (>! w2-ch (gen-matrix 300 100)))

(let [x1 (<!! (go (<! x-ch)))
        y1 (<!! (go (<! y-ch)))
        w1 (<!! (go (<! w1-ch)))
        w2 (<!! (go (<! w2-ch)))]

    ;; do stuff w/ x1 y1 w1 w2
)

我在符号 xy 中有预定义的(矩阵)向量。在使用它们之前,我需要修改这两个向量。这些向量非常大。我还需要生成两个随机矩阵。由于 go 宏以异步方式开始计算,我将所有四个计算任务拆分为单独的 go 块,并将结果放入通道。然后我有一个 let 块,我从通道中获取值并将它们存储到符号中。它们都在使用阻塞 <!! take 函数,因为它们在主线程上。

我想做的基本上是通过将程序片段拆分为异步进程来加快我的计算时间。这是正确的做法吗?

对于这种处理,future可能稍微合适一些。

link中的例子很容易掌握:

 (def f 
   (future 
     (Thread/sleep 10000) 
     (println "done") 
     100))

处理中,future块立即启动,所以上面确实启动了一个线程,等待10s,完成后打印"done"。

当你需要你可以使用的值时:

(deref f)
; or @f

将块和return值的代码块放在一起。

在同一个示例中,如果您在 10 秒过去之前调用 deref,调用将阻塞直到计算完成。

在您的示例中,由于您只是在等待计算完成,并且不太关心通道参与者之间的消息和交互 future 是我推荐的。所以:

 (future 
    (Mat/* x (map #(/ 1.0 %) (max-fold x))))

go 使用表达式的结果阻止 return 通道,因此您无需为其结果创建中间通道。下面的代码让您同时开始所有 4 个计算,然后阻塞这些值直到它们 return。如果您不需要立即使用某些结果,则可以仅在实际使用该值时才对其进行阻塞。

(let [x1-ch (go (Mat/* x (map #(/ 1.0 %) (max-fold x))))
      y1-ch (go (Mat/* y (map #(/ 1.0 %) (max-fold y))))
      w1-ch (go (gen-matrix 200 300))
      w2-ch (go (gen-matrix 300 100))
      x1 (<!! x1-ch)
      y1 (<!! y1-ch)
      w1 (<!! w1-ch)
      w2 (<!! w2-ch)]
  ;; do stuff w/ x1 y1 w1 w2
  )

如果您希望通过 运行 并行代码更普遍地加速您的程序,那么您可以考虑使用 Clojure 的 Reducers, or Aphyr's Tesser。这些通过将单个计算的工作拆分为可并行化的部分,然后将它们组合在一起来工作。这些将有效地 运行 处理与您的计算机一样多的内核。如果你 运行 你的每个计算都有一个 future 或在一个 go 块中,那么每个计算都将 运行 在一个线程上,一些可能会先于其他的完成并且那些核心将空闲。