core.async 有状态转换器不保持状态的分区?
core.async with partition-by stateful transducer not keeping state?
(我有一个先前的问题 ,并假设我在到达 core.async 时不会有问题) 。
给定这样的输入数据:
(require '[clojure.core.async :as a])
(def input-data
[{:itm_na 1 :seq_no 1 :doc_img "this is a very long "}
{:itm_na 1 :seq_no 2 :doc_img "sentence from a mainframe "}
{:itm_na 1 :seq_no 3 :doc_img "system that was built before i was "}
{:itm_na 1 :seq_no 4 :doc_img "born."}
{:itm_na 2 :seq_no 1 :doc_img "this is a another very long "}
{:itm_na 2 :seq_no 2 :doc_img "sentence from the same mainframe "}
{:itm_na 3 :seq_no 1 :doc_img "Ok here we are again. "}
{:itm_na 3 :seq_no 2 :doc_img "The mainframe only had 40 char per field so"}
{:itm_na 3 :seq_no 3 :doc_img "they broke it into multiple rows "}
{:itm_na 3 :seq_no 4 :doc_img "which seems to be common"}
{:itm_na 3 :seq_no 5 :doc_img " for the time. "}
{:itm_na 3 :seq_no 6 :doc_img "thanks for your help."}])
partition-by
(正如预期的那样)将我的数据聚集到 seq 中(以便稍后折叠):
(count (partition-by :itm_na input-data ))
;;=> 3
但是,当我出于某种原因尝试使用 core.async
管道执行此操作时
似乎不一样...我如何获得 stateful transducer 的一部分
partition-by
在异步管道中实际上保留状态?
(let
[source-chan (a/to-chan input-data)
target-chan (a/chan 100)
xf (comp (partition-by :itm_na))
]
(a/pipeline 1
target-chan
xf
source-chan)
(count (<!! (a/into [] target-chan))))
;;=>12
这应该是 3?
奇怪的是,当我将 xf
绑定到如下所示的频道时,我得到了预期的结果。我不确定为什么 a/pipeline
表现不同。
(let [xf (comp (partition-by :itm_na))
ch (a/chan 1 xf)]
(a/onto-chan ch input-data)
(count (<!! (a/into [] ch))))
=>3
从文档中...提到 stateful 位:
(clojure.repl/doc partition-by)
-------------------------
clojure.core/partition-by
([f] [f coll])
Applies f to each value in coll, splitting it each time f returns a
new value. Returns a lazy seq of partitions. Returns a stateful
transducer when no collection is provided.
这个特殊情况是 Rich Hickey 在他的演讲中 briefly highlighted 提出的:您不能将 pipeline
与有状态转换器一起使用,主要是因为 pipeline
的并行性质。
(我有一个先前的问题
给定这样的输入数据:
(require '[clojure.core.async :as a])
(def input-data
[{:itm_na 1 :seq_no 1 :doc_img "this is a very long "}
{:itm_na 1 :seq_no 2 :doc_img "sentence from a mainframe "}
{:itm_na 1 :seq_no 3 :doc_img "system that was built before i was "}
{:itm_na 1 :seq_no 4 :doc_img "born."}
{:itm_na 2 :seq_no 1 :doc_img "this is a another very long "}
{:itm_na 2 :seq_no 2 :doc_img "sentence from the same mainframe "}
{:itm_na 3 :seq_no 1 :doc_img "Ok here we are again. "}
{:itm_na 3 :seq_no 2 :doc_img "The mainframe only had 40 char per field so"}
{:itm_na 3 :seq_no 3 :doc_img "they broke it into multiple rows "}
{:itm_na 3 :seq_no 4 :doc_img "which seems to be common"}
{:itm_na 3 :seq_no 5 :doc_img " for the time. "}
{:itm_na 3 :seq_no 6 :doc_img "thanks for your help."}])
partition-by
(正如预期的那样)将我的数据聚集到 seq 中(以便稍后折叠):
(count (partition-by :itm_na input-data ))
;;=> 3
但是,当我出于某种原因尝试使用 core.async
管道执行此操作时
似乎不一样...我如何获得 stateful transducer 的一部分
partition-by
在异步管道中实际上保留状态?
(let
[source-chan (a/to-chan input-data)
target-chan (a/chan 100)
xf (comp (partition-by :itm_na))
]
(a/pipeline 1
target-chan
xf
source-chan)
(count (<!! (a/into [] target-chan))))
;;=>12
这应该是 3?
奇怪的是,当我将 xf
绑定到如下所示的频道时,我得到了预期的结果。我不确定为什么 a/pipeline
表现不同。
(let [xf (comp (partition-by :itm_na))
ch (a/chan 1 xf)]
(a/onto-chan ch input-data)
(count (<!! (a/into [] ch))))
=>3
从文档中...提到 stateful 位:
(clojure.repl/doc partition-by)
-------------------------
clojure.core/partition-by
([f] [f coll])
Applies f to each value in coll, splitting it each time f returns a
new value. Returns a lazy seq of partitions. Returns a stateful
transducer when no collection is provided.
这个特殊情况是 Rich Hickey 在他的演讲中 briefly highlighted 提出的:您不能将 pipeline
与有状态转换器一起使用,主要是因为 pipeline
的并行性质。