根据谓词将 fs2 流分组为子流
Grouping fs2 streams into sub-streams based on predicate
我需要一个可以解决以下问题的组合器:
test("groupUntil") {
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupUntil(_ == 1)
val result =
for {
group <- grouped
element <- group.fold(0)(_ + _)
} yield element
assertEquals(result.compile.toList.unsafeRunSync(), List(10, 9, 5))
}
内部流也必须是惰性的。 (注意,groupUntil
是我要的虚构组合器)。
注意:我必须在内部流的每个元素到达原始流时立即处理它们,也就是说,我迫不及待地想对整个组进行分块。
一种方法 你可以在这里实现惰性是使用 Stream
作为 fold
函数中的容器:
import cats.effect.IO
import fs2.Stream
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val acc: Stream[IO, Stream[IO, Int]] = Stream.empty
val grouped: Stream[IO, Stream[IO, Int]] = s.fold(acc) {
case (streamOfStreams, nextInt) if nextInt == 1 =>
Stream(Stream(nextInt).covary[IO]).append(streamOfStreams)
case (streamOfStreams, nextInt) =>
streamOfStreams.head.map(_.append(Stream(nextInt).covary[IO])) ++
streamOfStreams.tail
}.flatten
val result: Stream[IO, IO[Int]] = for {
group <- grouped
element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync().reverse, List(10, 9, 5))
小心,结果你会得到反向流,因为使用流的 last
元素不是个好主意,更好的方法是采用 head
但它需要我们 reverse
列表在我们处理的最后。
另一种方法 是使用 groupAdjacentBy
并将元素分组 predicate
:
val groupedOnceAndOthers: fs2.Stream[IO, (Boolean, Chunk[Int])] =
s.groupAdjacentBy(x => x == 1)
在这里你会得到成对的组:
(true,Chunk(1)), (false,Chunk(2, 3, 4)),
(true,Chunk(1)), (false,Chunk(2, 3, 3)),
(true,Chunk(1)), (false,Chunk(2, 2))
连接组 1
和没有我们可以使用 chunkN
(就像 scala List
中的 grouped
)和 map
结果来摆脱布尔对和 flatMap
压平 Chunk
s:
val grouped = groupedOnceAndOthers
.chunkN(2, allowFewer = true)
.map(ch => ch.flatMap(_._2).toList)
分组的结果是:
List(1, 2, 3, 4) List(1, 2, 3, 3) List(1, 2, 2)
完整的工作示例:
import cats.effect.IO
import fs2.Stream
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupAdjacentBy(x => x == 1)
.chunkN(2, allowFewer = true)
.map(ch => Stream.fromIterator[IO](ch.flatMap(_._2).iterator))
val result: Stream[IO, IO[Int]] = for {
group <- grouped
element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync(), List(10, 9, 5))
我需要一个可以解决以下问题的组合器:
test("groupUntil") {
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupUntil(_ == 1)
val result =
for {
group <- grouped
element <- group.fold(0)(_ + _)
} yield element
assertEquals(result.compile.toList.unsafeRunSync(), List(10, 9, 5))
}
内部流也必须是惰性的。 (注意,groupUntil
是我要的虚构组合器)。
注意:我必须在内部流的每个元素到达原始流时立即处理它们,也就是说,我迫不及待地想对整个组进行分块。
一种方法 你可以在这里实现惰性是使用 Stream
作为 fold
函数中的容器:
import cats.effect.IO
import fs2.Stream
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val acc: Stream[IO, Stream[IO, Int]] = Stream.empty
val grouped: Stream[IO, Stream[IO, Int]] = s.fold(acc) {
case (streamOfStreams, nextInt) if nextInt == 1 =>
Stream(Stream(nextInt).covary[IO]).append(streamOfStreams)
case (streamOfStreams, nextInt) =>
streamOfStreams.head.map(_.append(Stream(nextInt).covary[IO])) ++
streamOfStreams.tail
}.flatten
val result: Stream[IO, IO[Int]] = for {
group <- grouped
element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync().reverse, List(10, 9, 5))
小心,结果你会得到反向流,因为使用流的 last
元素不是个好主意,更好的方法是采用 head
但它需要我们 reverse
列表在我们处理的最后。
另一种方法 是使用 groupAdjacentBy
并将元素分组 predicate
:
val groupedOnceAndOthers: fs2.Stream[IO, (Boolean, Chunk[Int])] =
s.groupAdjacentBy(x => x == 1)
在这里你会得到成对的组:
(true,Chunk(1)), (false,Chunk(2, 3, 4)),
(true,Chunk(1)), (false,Chunk(2, 3, 3)),
(true,Chunk(1)), (false,Chunk(2, 2))
连接组 1
和没有我们可以使用 chunkN
(就像 scala List
中的 grouped
)和 map
结果来摆脱布尔对和 flatMap
压平 Chunk
s:
val grouped = groupedOnceAndOthers
.chunkN(2, allowFewer = true)
.map(ch => ch.flatMap(_._2).toList)
分组的结果是:
List(1, 2, 3, 4) List(1, 2, 3, 3) List(1, 2, 2)
完整的工作示例:
import cats.effect.IO
import fs2.Stream
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupAdjacentBy(x => x == 1)
.chunkN(2, allowFewer = true)
.map(ch => Stream.fromIterator[IO](ch.flatMap(_._2).iterator))
val result: Stream[IO, IO[Int]] = for {
group <- grouped
element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync(), List(10, 9, 5))