交织多个流

Interleave multiple streams

我有一个流列表 List[Stream[_]],列表的大小在函数开始时已知,每个流的大小等于 nn+1。我想获得交错流 例如

def myMagicFold[A](s: List[Stream[A]]): Stream[A]

val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4)) 

val result = myMagicFold(streams)

//result = Stream(1,2,3,4,1,2,3,4,1,2)

我正在使用 fs2.Stream。我的第一次拍摄:

val result = streams.fold(fs2.Stream.empty){
   case (s1, s2) => s1.interleaveAll(s2)
}

// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)

我正在寻找基于基本操作的解决方案(mapfold、...)

这是一次尝试,它按预期工作...

import cats.effect.IO
import cats.implicits._
import fs2.Stream

def myMagicFold[A](streams: List[Stream[IO, A]]): Stream[IO, A] =
  Stream.unfoldEval(streams) { streams =>
    streams.traverse { stream =>
      stream.head.compile.last
    } map { list =>
      list.sequence.map { chunk =>
        Stream.emits(chunk) -> list.map(_.tail)
      }
    }
  }.flatten

然而,这远不是一个好的解决方案,它效率极低,因为它在每一步都重新评估每个 Stream
您可以使用以下代码确认:

def stream(name: String, n: Int, value: Int): Stream[IO, Int] =
  Stream
    .range(start = 0, stopExclusive = n)
    .evalMap { i =>
      IO {
        println(s"${name} - ${i}")
        value
      }
    }
    
val list = List(stream("A", 3, 1), stream("B", 2, 2), stream("C", 3, 3))
myMagicFold(list).compile.toList.unsafeRunAsync(println)

哪个会打印

A - 0
B - 0
C - 0
A - 0
A - 1
B - 0
B - 1
C - 0
C - 1
A - 0
A - 1
A - 2
B - 0
B - 1
C - 0
C - 1
C - 2

Right(List(1, 2, 3, 1, 2, 3))

我很确定可以使用 Pulls 来解决这个问题,但我对此没有任何经验。

您最初的猜测不错,但是 interleaveAll 太快变平了,所以这就是您没有得到预期顺序的原因。 这是应该实现您尝试实现的目标的代码:


  def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
    streams
      .foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
        zipStreams(acc, s)
      }
      .flatMap(l => Stream.emits(l.reverse.flatten))

  def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
    s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }

在这种情况下,您将每个流的第 n 个元素添加到列表中,然后转换为 Stream,后者随后被展平为结果流。 由于 fs2.Stream 是基于拉式的,因此您一次只能在内存中拥有一个列表。