Scalaz 流分块到 N

Scalaz-stream chunking UP to N

给定一个队列:

val queue: Queue[Int] = async.boundedQueue[Int](1000)

我想拉出这个队列并将其流式传输到下游接收器,最多 100 个块。

queue.dequeue.chunk(100).to(downstreamConsumer) 

有点工作,但如果我说 101 条消息,它不会清空队列。将剩下 1 条消息,除非再推入 99 条消息。我想尽可能多地从队列中取出最多 100 条消息,尽可能快地处理我的下游进程。

是否有可用的组合器?

为此,您可能需要在从队列中出队时监控队列的大小。然后,如果大小达到 0,您就不会等待更多元素。事实上,您可以根据队列的大小实施 elastic 批处理的大小调整。 IE。 :

val q = async.unboundedQueue[String]

val deq:Process[Task,(String,Int)] = q.dequeue zip q.size
val elasticChunk: Process1[(String,Int), Vector[String]] = ???
val downstreamConsumer : Sink[Task,Vector[String]] = ???

deq.pipe(elasticChunk) to downstreamConsumer

我实际上用与预期不同的方式解决了这个问题。

scalaz-stream 队列现在包含一个 dequeueBatch 方法,允许对队列中的所有值进行出列,最多 N 个或块。

https://github.com/scalaz/scalaz-stream/issues/338