如何构建一个阻塞的分块惰性序列?

how to build a chunked lazy-seq that blocks?

我想使用分块 cons 或其他方式来创建 lazy-seq 块。给定来源:

(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))

(take 2 (-source-))
;; => (<future> <future>)

我想要一个名为 injest 的函数,其中:

(take 3 (injest (-source-)))
=> [;; sleep 100
    1 2 
    ;; sleep 100
    1]

(take 6 (injest (-source-)))
=> [;; sleep 100
    1 2 
    ;; sleep 100
    1 2 
    ;; sleep 100
    1 2]

;; ... etc ...

我将如何编写这个函数?

我认为你只需要 deref'ing 惰性序列的元素就可以了,并且只强制消耗你需要的条目,就像这样:

(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))

(defn injest [src]
  (map deref src))

;; (time (dorun (take 3 (injest (-source-)))))
;; => "Elapsed time: 303.432003 msecs"

;; (time (dorun (take 6 (injest (-source-)))))
;; => "Elapsed time: 603.319103 msecs"

另一方面,我认为根据项目的数量,最好避免创建大量期货并使用 lazy-seq,这取决于元素的索引可能会阻塞一段时间.

当您使用此源时,它会自然地阻塞,因此您不必做任何非常花哨的事情。只需 (mapcat deref):

就足够了
(doseq [x (take 16 (mapcat deref (-source- )))]
  (println {:value x :time (System/currentTimeMillis)}))
{:value 1, :time 1597725323091}
{:value 2, :time 1597725323092}
{:value 1, :time 1597725323092}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323093}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323194}
{:value 2, :time 1597725323195}
{:value 1, :time 1597725323299}
{:value 2, :time 1597725323300}
{:value 1, :time 1597725323406}
{:value 2, :time 1597725323406}
{:value 1, :time 1597725323510}
{:value 2, :time 1597725323511}

请注意前几项是如何同时出现的,然后每对都按您预期的时间错开?这是由于 well-known(?) 事实,即出于性能原因,apply(因此 mapcat 是用 apply concat 实现的)比必要的更急切。如果即使在前几项上获得正确的延迟对您来说也很重要,您可以简单地实现您自己的 apply concat 版本,该版本不针对短输入列表进行优化。

(defn ingest [xs]
  (when-let [coll (seq (map (comp seq deref) xs))]
    ((fn step [curr remaining]
       (lazy-seq
         (cond curr (cons (first curr) (step (next curr) remaining))
               remaining (step (first remaining) (next remaining)))))
      (first coll) (next coll))))

一个。 Webb 在评论中提出了一个等效但更简单的实现:

(defn ingest [coll]
  (for [batch coll,
        item @batch]
    item))

可以通过迭代状态机来解决。我不认为这会受到其他人指出的与 apply 相关的优化的影响,但我不确定这种方法是否还有其他问题:

(defn step-state [[current-element-to-unpack input-seq]]
  (cond
    (empty? input-seq) nil
    (empty? current-element-to-unpack) [(deref (first input-seq)) (rest input-seq)]
    :default [(rest current-element-to-unpack) input-seq]))

(defn injest [input-seq]
  (->> [[] input-seq]
       (iterate step-state)
       (take-while some?)
       (map first)
       (filter seq)
       (map first)))