core.async 有状态转换器不保持状态的分区?

core.async with partition-by stateful transducer not keeping state?

(我有一个先前的问题 ,并假设我在到达 core.async 时不会有问题)

给定这样的输入数据:

(require '[clojure.core.async :as a])

(def input-data
  [{:itm_na 1 :seq_no 1  :doc_img "this is a very long "}
   {:itm_na 1 :seq_no 2  :doc_img "sentence from a mainframe "}
   {:itm_na 1 :seq_no 3  :doc_img "system that was built before i was "}
   {:itm_na 1 :seq_no 4  :doc_img "born."}
   {:itm_na 2 :seq_no 1  :doc_img "this is a another very long "}
   {:itm_na 2 :seq_no 2  :doc_img "sentence from the same mainframe "}
   {:itm_na 3 :seq_no 1  :doc_img "Ok here we are again. "}
   {:itm_na 3 :seq_no 2  :doc_img "The mainframe only had 40 char per field so"}
   {:itm_na 3 :seq_no 3  :doc_img "they broke it into multiple rows "}
   {:itm_na 3 :seq_no 4  :doc_img "which seems to be common"}
   {:itm_na 3 :seq_no 5  :doc_img " for the time. "}
   {:itm_na 3 :seq_no 6  :doc_img "thanks for your help."}])

partition-by(正如预期的那样)将我的数据聚集到 seq 中(以便稍后折叠):

(count (partition-by :itm_na input-data ))
;;=> 3

但是,当我出于某种原因尝试使用 core.async 管道执行此操作时 似乎不一样...我如何获得 stateful transducer 的一部分 partition-by 在异步管道中实际上保留状态?

(let
    [source-chan (a/to-chan input-data)
     target-chan (a/chan 100)
     xf (comp (partition-by :itm_na))
     ]
  (a/pipeline 1
              target-chan
              xf
              source-chan)
  (count (<!! (a/into [] target-chan))))

;;=>12

这应该是 3?

奇怪的是,当我将 xf 绑定到如下所示的频道时,我得到了预期的结果。我不确定为什么 a/pipeline 表现不同。

(let [xf (comp (partition-by :itm_na))
      ch (a/chan 1 xf)]
  (a/onto-chan ch input-data)
  (count (<!! (a/into [] ch))))
=>3

从文档中...提到 stateful 位:

(clojure.repl/doc partition-by) 
-------------------------
clojure.core/partition-by
([f] [f coll])
  Applies f to each value in coll, splitting it each time f returns a
   new value.  Returns a lazy seq of partitions.  Returns a stateful
   transducer when no collection is provided.

这个特殊情况是 Rich Hickey 在他的演讲中 briefly highlighted 提出的:您不能将 pipeline 与有状态转换器一起使用,主要是因为 pipeline 的并行性质。