将 fs2 流输出拆分为两个文件
Splitting the fs2 stream output to two files
我刚刚开始使用 fs2 流进行冒险。我想要实现的是读取一个文件(一个大文件,这就是我使用 fs2 的原因),对其进行转换并将结果写入两个不同的文件(基于某些谓词)。一些代码(来自 https://github.com/typelevel/fs2),附上我的评论:
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
/* instead of the last line I want something like this:
.through(<write temperatures higher than 10 to one file, the rest to the other one>)
*/
}
最有效的方法是什么?显而易见的解决方案是让两个流具有不同的过滤器,但效率低下(将有两次通过)。
不幸的是,据我所知,没有简单的方法可以将 fs2 流一分为二。
您可以做的是通过将值推送到两个队列之一来拆分您的流(第一个队列用于小于 10 的值,第二个用于大于或等于 10 的值)。如果我们使用 NoneTerminatedQueue
那么队列将不会终止,直到我们将 None
放入其中。然后我们可以只使用 dequeue
创建单独的流,直到队列没有关闭。
下面的示例解决方案。我将写入文件和读取分成不同的方法:
import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}
object FahrenheitToCelsius extends IOApp {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
//I split reading into separate method
def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.evalMap { value =>
if (value > 10) { //here we put values to one of queues
over.enqueue1(Some(value)) //until we put some queues are not close
} else {
under.enqueue1(Some(value))
}
}
.onFinalize(
over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
)
//function write takes as argument source queue and target file
def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
s.map(_.toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(fileName), blocker))
}
val converter: Stream[IO, Unit] = for {
over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
under <- Stream.eval(Queue.noneTerminated[IO, Double])
blocker <- Stream.resource(Blocker[IO])
_ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
.concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
.concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
} yield ()
override def run(args: List[String]): IO[ExitCode] =
converter
.compile
.drain
.as(ExitCode.Success)
}
我找到了另一个解决方案。这是:
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths
object Converter extends IOApp {
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)
Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
}
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
.through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
我认为它比涉及队列的答案更容易理解(尽管队列似乎是类似情况的常见解决方案)。
也可以使用 broadcastThrough
,它允许将流的所有元素广播到多个管道。
你的问题的完整解决方案可能如下所示(使用 cats effect 3.3.8 和 fs2 3.2.5。这就是为什么它看起来有点不同但无论版本如何,主要思想都是相同的):
import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}
object Converter extends IOApp.Simple {
val converter: Stream[IO, Unit] = {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
_.filter(predicate)
.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
Files[IO].readAll(Path("testdata/fahrenheit.txt"))
.through(text.utf8.decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.broadcastThrough(
saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
)
}
def run: IO[Unit] =
converter.compile.drain
}
saveFiltered
现在是一个返回 Pipe
的函数,它是使用文件名和谓词构建的。此函数用于为 broadcastThrough
构建两个参数。我用一个小例子测试了它,FWIW 它按预期工作。
broadcastThrough
保证流中的所有元素都发送到所有管道。 Scaladoc 中提到了一个小警告:最慢的管道将导致整个流变慢。我认为在这种特殊情况下这不是问题,因为我猜这两个管道都同样快。
你甚至可以更进一步,稍微概括一下这个想法:
def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
_.broadcastThrough[F, B](
_.filter(predicate).through(in),
_.filter(a => !predicate(a)).through(out)
)
有了它,您不必确保两个谓词产生互斥的结果。
稍作调整saveFiltered
:
def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
_.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
流的最后部分有点短:
...
.through(
partition(n => n >= 0,
saveFiltered2(Path("testdata/celsius_over.txt")),
saveFiltered2(Path("testdata/celsius_below.txt"))))```
我刚刚开始使用 fs2 流进行冒险。我想要实现的是读取一个文件(一个大文件,这就是我使用 fs2 的原因),对其进行转换并将结果写入两个不同的文件(基于某些谓词)。一些代码(来自 https://github.com/typelevel/fs2),附上我的评论:
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
/* instead of the last line I want something like this:
.through(<write temperatures higher than 10 to one file, the rest to the other one>)
*/
}
最有效的方法是什么?显而易见的解决方案是让两个流具有不同的过滤器,但效率低下(将有两次通过)。
不幸的是,据我所知,没有简单的方法可以将 fs2 流一分为二。
您可以做的是通过将值推送到两个队列之一来拆分您的流(第一个队列用于小于 10 的值,第二个用于大于或等于 10 的值)。如果我们使用 NoneTerminatedQueue
那么队列将不会终止,直到我们将 None
放入其中。然后我们可以只使用 dequeue
创建单独的流,直到队列没有关闭。
下面的示例解决方案。我将写入文件和读取分成不同的方法:
import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}
object FahrenheitToCelsius extends IOApp {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
//I split reading into separate method
def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.evalMap { value =>
if (value > 10) { //here we put values to one of queues
over.enqueue1(Some(value)) //until we put some queues are not close
} else {
under.enqueue1(Some(value))
}
}
.onFinalize(
over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
)
//function write takes as argument source queue and target file
def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
s.map(_.toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(fileName), blocker))
}
val converter: Stream[IO, Unit] = for {
over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
under <- Stream.eval(Queue.noneTerminated[IO, Double])
blocker <- Stream.resource(Blocker[IO])
_ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
.concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
.concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
} yield ()
override def run(args: List[String]): IO[ExitCode] =
converter
.compile
.drain
.as(ExitCode.Success)
}
我找到了另一个解决方案。这是:
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths
object Converter extends IOApp {
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)
Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
}
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
.through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
我认为它比涉及队列的答案更容易理解(尽管队列似乎是类似情况的常见解决方案)。
也可以使用 broadcastThrough
,它允许将流的所有元素广播到多个管道。
你的问题的完整解决方案可能如下所示(使用 cats effect 3.3.8 和 fs2 3.2.5。这就是为什么它看起来有点不同但无论版本如何,主要思想都是相同的):
import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}
object Converter extends IOApp.Simple {
val converter: Stream[IO, Unit] = {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
_.filter(predicate)
.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
Files[IO].readAll(Path("testdata/fahrenheit.txt"))
.through(text.utf8.decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.broadcastThrough(
saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
)
}
def run: IO[Unit] =
converter.compile.drain
}
saveFiltered
现在是一个返回 Pipe
的函数,它是使用文件名和谓词构建的。此函数用于为 broadcastThrough
构建两个参数。我用一个小例子测试了它,FWIW 它按预期工作。
broadcastThrough
保证流中的所有元素都发送到所有管道。 Scaladoc 中提到了一个小警告:最慢的管道将导致整个流变慢。我认为在这种特殊情况下这不是问题,因为我猜这两个管道都同样快。
你甚至可以更进一步,稍微概括一下这个想法:
def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
_.broadcastThrough[F, B](
_.filter(predicate).through(in),
_.filter(a => !predicate(a)).through(out)
)
有了它,您不必确保两个谓词产生互斥的结果。
稍作调整saveFiltered
:
def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
_.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
流的最后部分有点短:
...
.through(
partition(n => n >= 0,
saveFiltered2(Path("testdata/celsius_over.txt")),
saveFiltered2(Path("testdata/celsius_below.txt"))))```