如何构建一个阻塞的分块惰性序列?
how to build a chunked lazy-seq that blocks?
我想使用分块 cons 或其他方式来创建 lazy-seq
块。给定来源:
(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))
(take 2 (-source-))
;; => (<future> <future>)
我想要一个名为 injest
的函数,其中:
(take 3 (injest (-source-)))
=> [;; sleep 100
1 2
;; sleep 100
1]
(take 6 (injest (-source-)))
=> [;; sleep 100
1 2
;; sleep 100
1 2
;; sleep 100
1 2]
;; ... etc ...
我将如何编写这个函数?
我认为你只需要 deref
'ing 惰性序列的元素就可以了,并且只强制消耗你需要的条目,就像这样:
(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))
(defn injest [src]
(map deref src))
;; (time (dorun (take 3 (injest (-source-)))))
;; => "Elapsed time: 303.432003 msecs"
;; (time (dorun (take 6 (injest (-source-)))))
;; => "Elapsed time: 603.319103 msecs"
另一方面,我认为根据项目的数量,最好避免创建大量期货并使用 lazy-seq,这取决于元素的索引可能会阻塞一段时间.
当您使用此源时,它会自然地阻塞,因此您不必做任何非常花哨的事情。只需 (mapcat deref)
:
就足够了
(doseq [x (take 16 (mapcat deref (-source- )))]
(println {:value x :time (System/currentTimeMillis)}))
{:value 1, :time 1597725323091}
{:value 2, :time 1597725323092}
{:value 1, :time 1597725323092}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323093}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323194}
{:value 2, :time 1597725323195}
{:value 1, :time 1597725323299}
{:value 2, :time 1597725323300}
{:value 1, :time 1597725323406}
{:value 2, :time 1597725323406}
{:value 1, :time 1597725323510}
{:value 2, :time 1597725323511}
请注意前几项是如何同时出现的,然后每对都按您预期的时间错开?这是由于 well-known(?) 事实,即出于性能原因,apply
(因此 mapcat
是用 apply concat
实现的)比必要的更急切。如果即使在前几项上获得正确的延迟对您来说也很重要,您可以简单地实现您自己的 apply concat
版本,该版本不针对短输入列表进行优化。
(defn ingest [xs]
(when-let [coll (seq (map (comp seq deref) xs))]
((fn step [curr remaining]
(lazy-seq
(cond curr (cons (first curr) (step (next curr) remaining))
remaining (step (first remaining) (next remaining)))))
(first coll) (next coll))))
一个。 Webb 在评论中提出了一个等效但更简单的实现:
(defn ingest [coll]
(for [batch coll,
item @batch]
item))
可以通过迭代状态机来解决。我不认为这会受到其他人指出的与 apply
相关的优化的影响,但我不确定这种方法是否还有其他问题:
(defn step-state [[current-element-to-unpack input-seq]]
(cond
(empty? input-seq) nil
(empty? current-element-to-unpack) [(deref (first input-seq)) (rest input-seq)]
:default [(rest current-element-to-unpack) input-seq]))
(defn injest [input-seq]
(->> [[] input-seq]
(iterate step-state)
(take-while some?)
(map first)
(filter seq)
(map first)))
我想使用分块 cons 或其他方式来创建 lazy-seq
块。给定来源:
(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))
(take 2 (-source-))
;; => (<future> <future>)
我想要一个名为 injest
的函数,其中:
(take 3 (injest (-source-)))
=> [;; sleep 100
1 2
;; sleep 100
1]
(take 6 (injest (-source-)))
=> [;; sleep 100
1 2
;; sleep 100
1 2
;; sleep 100
1 2]
;; ... etc ...
我将如何编写这个函数?
我认为你只需要 deref
'ing 惰性序列的元素就可以了,并且只强制消耗你需要的条目,就像这样:
(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))
(defn injest [src]
(map deref src))
;; (time (dorun (take 3 (injest (-source-)))))
;; => "Elapsed time: 303.432003 msecs"
;; (time (dorun (take 6 (injest (-source-)))))
;; => "Elapsed time: 603.319103 msecs"
另一方面,我认为根据项目的数量,最好避免创建大量期货并使用 lazy-seq,这取决于元素的索引可能会阻塞一段时间.
当您使用此源时,它会自然地阻塞,因此您不必做任何非常花哨的事情。只需 (mapcat deref)
:
(doseq [x (take 16 (mapcat deref (-source- )))]
(println {:value x :time (System/currentTimeMillis)}))
{:value 1, :time 1597725323091}
{:value 2, :time 1597725323092}
{:value 1, :time 1597725323092}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323093}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323194}
{:value 2, :time 1597725323195}
{:value 1, :time 1597725323299}
{:value 2, :time 1597725323300}
{:value 1, :time 1597725323406}
{:value 2, :time 1597725323406}
{:value 1, :time 1597725323510}
{:value 2, :time 1597725323511}
请注意前几项是如何同时出现的,然后每对都按您预期的时间错开?这是由于 well-known(?) 事实,即出于性能原因,apply
(因此 mapcat
是用 apply concat
实现的)比必要的更急切。如果即使在前几项上获得正确的延迟对您来说也很重要,您可以简单地实现您自己的 apply concat
版本,该版本不针对短输入列表进行优化。
(defn ingest [xs]
(when-let [coll (seq (map (comp seq deref) xs))]
((fn step [curr remaining]
(lazy-seq
(cond curr (cons (first curr) (step (next curr) remaining))
remaining (step (first remaining) (next remaining)))))
(first coll) (next coll))))
一个。 Webb 在评论中提出了一个等效但更简单的实现:
(defn ingest [coll]
(for [batch coll,
item @batch]
item))
可以通过迭代状态机来解决。我不认为这会受到其他人指出的与 apply
相关的优化的影响,但我不确定这种方法是否还有其他问题:
(defn step-state [[current-element-to-unpack input-seq]]
(cond
(empty? input-seq) nil
(empty? current-element-to-unpack) [(deref (first input-seq)) (rest input-seq)]
:default [(rest current-element-to-unpack) input-seq]))
(defn injest [input-seq]
(->> [[] input-seq]
(iterate step-state)
(take-while some?)
(map first)
(filter seq)
(map first)))