根据谓词将 fs2 流分组为子流

Grouping fs2 streams into sub-streams based on predicate

我需要一个可以解决以下问题的组合器:

test("groupUntil") {
  val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
  val grouped: Stream[IO, Stream[IO, Int]] = s.groupUntil(_ == 1)

  val result =
    for {
      group   <- grouped
      element <- group.fold(0)(_ + _)
  } yield element

  assertEquals(result.compile.toList.unsafeRunSync(), List(10, 9, 5))
}

内部流也必须是惰性的。 (注意,groupUntil 是我要的虚构组合器)。

注意:我必须在内部流的每个元素到达原始流时立即处理它们,也就是说,我迫不及待地想对整个组进行分块。

一种方法 你可以在这里实现惰性是使用 Stream 作为 fold 函数中的容器:

import cats.effect.IO
import fs2.Stream

val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val acc: Stream[IO, Stream[IO, Int]] = Stream.empty
val grouped: Stream[IO, Stream[IO, Int]] = s.fold(acc) {
  case (streamOfStreams, nextInt) if nextInt == 1 =>
    Stream(Stream(nextInt).covary[IO]).append(streamOfStreams)
  case (streamOfStreams, nextInt) =>
    streamOfStreams.head.map(_.append(Stream(nextInt).covary[IO])) ++ 
      streamOfStreams.tail
}.flatten

val result: Stream[IO, IO[Int]] = for {
  group <- grouped
  element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync().reverse, List(10, 9, 5))

小心,结果你会得到反向流,因为使用流的 last 元素不是个好主意,更好的方法是采用 head 但它需要我们 reverse 列表在我们处理的最后。

另一种方法 是使用 groupAdjacentBy 并将元素分组 predicate:

val groupedOnceAndOthers: fs2.Stream[IO, (Boolean, Chunk[Int])] = 
  s.groupAdjacentBy(x => x == 1)

在这里你会得到成对的组:

(true,Chunk(1)), (false,Chunk(2, 3, 4)), 
(true,Chunk(1)), (false,Chunk(2, 3, 3)), 
(true,Chunk(1)), (false,Chunk(2, 2))

连接组 1 和没有我们可以使用 chunkN (就像 scala List 中的 grouped )和 map 结果来摆脱布尔对和 flatMap 压平 Chunks:

val grouped = groupedOnceAndOthers
  .chunkN(2, allowFewer = true)
  .map(ch => ch.flatMap(_._2).toList)

分组的结果是: List(1, 2, 3, 4) List(1, 2, 3, 3) List(1, 2, 2)

完整的工作示例:

import cats.effect.IO
import fs2.Stream

val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupAdjacentBy(x => x == 1)
  .chunkN(2, allowFewer = true)
  .map(ch => Stream.fromIterator[IO](ch.flatMap(_._2).iterator))

val result: Stream[IO, IO[Int]] = for {
  group <- grouped
  element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync(), List(10, 9, 5))