Clojure - Core.async 管道 + 混淆

Clojure - Core.async Pipeline + take confusion

我很难理解我认为 Clojure 异步库中一个非常简单的概念。我实际上是在用管道创建两个通道,其中输出通道是使用输入通道的 take 函数创建的。

根据我的理解,take 的目的是限制通道在关闭之前将接收的项目数量(如果此时输入通道尚未关闭)。但是,我一直在玩的代码示例并没有产生我预期的结果。

以下面的代码为例:

(def in (chan 1))
(def out (async/take 5 in 1))

(doseq [i (range 10)]
  (go (>! in i)))

(pipeline 4 out (filter even?) in)

(go-loop []
  (when-some [val (<! out)]
    (println val)
    (recur))))

我期望发生的是管道会过滤掉奇数,并且只将偶数传递给 'out' 通道,当输出通道收到 5 个偶数时它会关闭。然而,我看到的是打印到 REPL 的奇数和偶数,如下所示:

2 7 4个 0 8个 6

此时输出通道还没有关闭,运行第二次 doseq 会在最终关闭之前打印一些其他值。

我对这里发生的事情感到非常困惑,当使用 take 而不是管道时它就像一个魅力,当不使用 take 但仍然使用管道时它也有效,将两者结合使用是一个看起来完全不同的故事。我在这里遗漏了一些明显的东西吗?如果这是一个简单的错误,请道歉,这是我第一次(尽管很天真)尝试使用 core.async.

您已将 takepipeline 置于竞争中。他们都从 in 中取出项目并将它们添加到 out。替换out的定义:

(def out (async/chan 3))

例如,得到预期的结果

0
2
4
6
8

如果你真的想使用async/take,你可以这样做:

(def first (async/chan 1))
(def second (async/chan 3))
(pipeline 4 second (filter even?) first)
(def third (async/take 3 second))

(defn run []
  (go
    (doseq [i (range 10)]
      (>! first i)))
  (go (loop []
        (when-some [val (<! third)]
          (println val)
          (recur)))))

结果:

0
2
4