将 fs2 流输出拆分为两个文件

Splitting the fs2 stream output to two files

我刚刚开始使用 fs2 流进行冒险。我想要实现的是读取一个文件(一个大文件,这就是我使用 fs2 的原因),对其进行转换并将结果写入两个不同的文件(基于某些谓词)。一些代码(来自 https://github.com/typelevel/fs2),附上我的评论:

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
      /* instead of the last line I want something like this:
      .through(<write temperatures higher than 10 to one file, the rest to the other one>)


不幸的是,据我所知,没有简单的方法可以将 fs2 流一分为二。

您可以做的是通过将值推送到两个队列之一来拆分您的流(第一个队列用于小于 10 的值,第二个用于大于或等于 10 的值)。如果我们使用 NoneTerminatedQueue 那么队列将不会终止,直到我们将 None 放入其中。然后我们可以只使用 dequeue 创建单独的流,直到队列没有关闭。


import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}

object FahrenheitToCelsius extends IOApp {

  def fahrenheitToCelsius(f: Double): Double =
    (f - 32.0) * (5.0 / 9.0)

  //I split reading into separate method
  def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble))
    .evalMap { value =>
      if (value > 10) { //here we put values to one of queues
        over.enqueue1(Some(value)) //until we put some queues are not close
      } else {
      over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues

  //function write takes as argument source queue and target file
  def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
      .through(io.file.writeAll(Paths.get(fileName), blocker))

  val converter: Stream[IO, Unit] = for {
    over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
    under <- Stream.eval(Queue.noneTerminated[IO, Double])
    blocker <- Stream.resource(Blocker[IO])
    _ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
      .concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
      .concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
  } yield ()

  override def run(args: List[String]): IO[ExitCode] =



import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
      val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)

      Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
      .through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )

  def run(args: List[String]): IO[ExitCode] =


也可以使用 broadcastThrough,它允许将流的所有元素广播到多个管道。

你的问题的完整解决方案可能如下所示(使用 cats effect 3.3.8 和 fs2 3.2.5。这就是为什么它看起来有点不同但无论版本如何,主要思想都是相同的):

import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}

object Converter extends IOApp.Simple {

  val converter: Stream[IO, Unit] = {
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0 / 9.0)

    def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =

      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
        saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
        saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })

  def run: IO[Unit] =

saveFiltered 现在是一个返回 Pipe 的函数,它是使用文件名和谓词构建的。此函数用于为 broadcastThrough 构建两个参数。我用一个小例子测试了它,FWIW 它按预期工作。

broadcastThrough 保证流中的所有元素都发送到所有管道。 Scaladoc 中提到了一个小警告:最慢的管道将导致整个流变慢。我认为在这种特殊情况下这不是问题,因为我猜这两个管道都同样快。


def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
  _.broadcastThrough[F, B](
    _.filter(a => !predicate(a)).through(out)



def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =


  partition(n => n >= 0,