交织多个流
Interleave multiple streams
我有一个流列表 List[Stream[_]]
,列表的大小在函数开始时已知,每个流的大小等于 n
或 n+1
。我想获得交错流
例如
def myMagicFold[A](s: List[Stream[A]]): Stream[A]
val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4))
val result = myMagicFold(streams)
//result = Stream(1,2,3,4,1,2,3,4,1,2)
我正在使用 fs2.Stream
。我的第一次拍摄:
val result = streams.fold(fs2.Stream.empty){
case (s1, s2) => s1.interleaveAll(s2)
}
// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)
我正在寻找基于基本操作的解决方案(map
、fold
、...)
这是一次尝试,它按预期工作...
import cats.effect.IO
import cats.implicits._
import fs2.Stream
def myMagicFold[A](streams: List[Stream[IO, A]]): Stream[IO, A] =
Stream.unfoldEval(streams) { streams =>
streams.traverse { stream =>
stream.head.compile.last
} map { list =>
list.sequence.map { chunk =>
Stream.emits(chunk) -> list.map(_.tail)
}
}
}.flatten
然而,这远不是一个好的解决方案,它效率极低,因为它在每一步都重新评估每个 Stream。
您可以使用以下代码确认:
def stream(name: String, n: Int, value: Int): Stream[IO, Int] =
Stream
.range(start = 0, stopExclusive = n)
.evalMap { i =>
IO {
println(s"${name} - ${i}")
value
}
}
val list = List(stream("A", 3, 1), stream("B", 2, 2), stream("C", 3, 3))
myMagicFold(list).compile.toList.unsafeRunAsync(println)
哪个会打印
A - 0
B - 0
C - 0
A - 0
A - 1
B - 0
B - 1
C - 0
C - 1
A - 0
A - 1
A - 2
B - 0
B - 1
C - 0
C - 1
C - 2
Right(List(1, 2, 3, 1, 2, 3))
我很确定可以使用 Pulls 来解决这个问题,但我对此没有任何经验。
您最初的猜测不错,但是 interleaveAll
太快变平了,所以这就是您没有得到预期顺序的原因。
这是应该实现您尝试实现的目标的代码:
def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
streams
.foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
zipStreams(acc, s)
}
.flatMap(l => Stream.emits(l.reverse.flatten))
def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }
在这种情况下,您将每个流的第 n 个元素添加到列表中,然后转换为 Stream
,后者随后被展平为结果流。
由于 fs2.Stream
是基于拉式的,因此您一次只能在内存中拥有一个列表。
我有一个流列表 List[Stream[_]]
,列表的大小在函数开始时已知,每个流的大小等于 n
或 n+1
。我想获得交错流
例如
def myMagicFold[A](s: List[Stream[A]]): Stream[A]
val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4))
val result = myMagicFold(streams)
//result = Stream(1,2,3,4,1,2,3,4,1,2)
我正在使用 fs2.Stream
。我的第一次拍摄:
val result = streams.fold(fs2.Stream.empty){
case (s1, s2) => s1.interleaveAll(s2)
}
// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)
我正在寻找基于基本操作的解决方案(map
、fold
、...)
这是一次尝试,它按预期工作...
import cats.effect.IO
import cats.implicits._
import fs2.Stream
def myMagicFold[A](streams: List[Stream[IO, A]]): Stream[IO, A] =
Stream.unfoldEval(streams) { streams =>
streams.traverse { stream =>
stream.head.compile.last
} map { list =>
list.sequence.map { chunk =>
Stream.emits(chunk) -> list.map(_.tail)
}
}
}.flatten
然而,这远不是一个好的解决方案,它效率极低,因为它在每一步都重新评估每个 Stream。
您可以使用以下代码确认:
def stream(name: String, n: Int, value: Int): Stream[IO, Int] =
Stream
.range(start = 0, stopExclusive = n)
.evalMap { i =>
IO {
println(s"${name} - ${i}")
value
}
}
val list = List(stream("A", 3, 1), stream("B", 2, 2), stream("C", 3, 3))
myMagicFold(list).compile.toList.unsafeRunAsync(println)
哪个会打印
A - 0
B - 0
C - 0
A - 0
A - 1
B - 0
B - 1
C - 0
C - 1
A - 0
A - 1
A - 2
B - 0
B - 1
C - 0
C - 1
C - 2Right(List(1, 2, 3, 1, 2, 3))
我很确定可以使用 Pulls 来解决这个问题,但我对此没有任何经验。
您最初的猜测不错,但是 interleaveAll
太快变平了,所以这就是您没有得到预期顺序的原因。
这是应该实现您尝试实现的目标的代码:
def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
streams
.foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
zipStreams(acc, s)
}
.flatMap(l => Stream.emits(l.reverse.flatten))
def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }
在这种情况下,您将每个流的第 n 个元素添加到列表中,然后转换为 Stream
,后者随后被展平为结果流。
由于 fs2.Stream
是基于拉式的,因此您一次只能在内存中拥有一个列表。