我如何 "split" fs2 中的流?
How do I "split" a stream in fs2?
我想做这样的事情:
def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) =
(stream, stream.map(split)
但这不起作用,因为它从源中 "pulls" 两次 - 当我同时耗尽 stream
和 stream.map(split)
时各一次。我该如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于 "push" 的模型,这样我就不会拉两次?
Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
是的。例如,您可以使用来自 fs2 的队列:
def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] =
for {
q <- Queue.noneTerminated[F, A]
} yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))
当然,这里的问题是,如果调用者忽略任何一个流,另一个流将死锁并且永远不会发出任何东西。这通常是您 运行 在尝试将一个流分成多个流时遇到的问题,并且无论何时订阅,都有一个值保证出现在每个子流中。
我通常采用的解决方案是合并较大的操作并使用 broadcast
或 parJoin
:
等运算符
def splitAndRun[F[_]: Concurrent, A](
base: Stream[F, A],
runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
base.broadcastTo(run: _*).compile.drain
在这里,你知道你将拥有多少消费者,所以一开始就不会有被忽略的流。
我想做这样的事情:
def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) =
(stream, stream.map(split)
但这不起作用,因为它从源中 "pulls" 两次 - 当我同时耗尽 stream
和 stream.map(split)
时各一次。我该如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于 "push" 的模型,这样我就不会拉两次?
Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
是的。例如,您可以使用来自 fs2 的队列:
def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] =
for {
q <- Queue.noneTerminated[F, A]
} yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))
当然,这里的问题是,如果调用者忽略任何一个流,另一个流将死锁并且永远不会发出任何东西。这通常是您 运行 在尝试将一个流分成多个流时遇到的问题,并且无论何时订阅,都有一个值保证出现在每个子流中。
我通常采用的解决方案是合并较大的操作并使用 broadcast
或 parJoin
:
def splitAndRun[F[_]: Concurrent, A](
base: Stream[F, A],
runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
base.broadcastTo(run: _*).compile.drain
在这里,你知道你将拥有多少消费者,所以一开始就不会有被忽略的流。