将元素从队列中取出时写入文件:Scala fs2 Stream
Writing elements to a file as they are dequeued from the queue : Scala fs2 Stream
我对 fs2 流、进程元素进行了小测试,等待然后将它们写入文件。
我收到一个类型错误,我无法弄清楚它的意思:
错误:required: fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit],
found : [F[_]]fs2.Pipe[F,Byte,Unit]
导入 java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO]) {
import core.Processing._
val blocker: Blocker =
Blocker.liftExecutionContext(
scala.concurrent.ExecutionContext.Implicits.global
)
def storeInQueue: Stream[IO, Unit] = {
Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.metered(Random.between(1, 20).seconds)
.through(q.enqueue)
}
def getFromQueue: Stream[IO, Unit] = {
q.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
.through(
io.file
.writeAll(Paths.get("file.txt"), blocker)
)
}
}
object Five extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, Int](10)
b = new StreamTypeIntToDouble(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
这里有几个问题,第一个是最令人困惑的。 writeAll
在其上下文 F[_]
中是多态的,但它需要 F
(以及 Sync
)的 ContextShift
实例。您当前在范围内没有 ContextShift[IO]
,因此编译器不会推断 writeAll
的 F
应该是 IO
。如果你添加这样的东西:
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
…然后编译器将按您的预期推断出 IO
。
我对这种情况的建议是跳过类型推断。用类型参数写出来只是略微更冗长:
.through(
io.file
.writeAll[IO](Paths.get("file.txt"), blocker)
)
…这意味着您会收到有用的错误消息,例如缺少类型 class 实例。
一旦您解决了这个问题,还有其他几个问题。接下来是在这种情况下使用 evalMap
意味着您将拥有一个 ()
值流。如果将其更改为 evalTap
,日志记录副作用仍会适当发生,但不会丢失调用它的流的实际值。
最后一个问题是 writeAll
需要字节流,而您给它的是 Int
字节流。你想如何处理这种差异取决于预期的语义,但为了举例,像 .map(_.toByte)
这样的东西会让它编译。
我对 fs2 流、进程元素进行了小测试,等待然后将它们写入文件。 我收到一个类型错误,我无法弄清楚它的意思:
错误:required: fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit],
found : [F[_]]fs2.Pipe[F,Byte,Unit]
导入 java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO]) {
import core.Processing._
val blocker: Blocker =
Blocker.liftExecutionContext(
scala.concurrent.ExecutionContext.Implicits.global
)
def storeInQueue: Stream[IO, Unit] = {
Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.metered(Random.between(1, 20).seconds)
.through(q.enqueue)
}
def getFromQueue: Stream[IO, Unit] = {
q.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
.through(
io.file
.writeAll(Paths.get("file.txt"), blocker)
)
}
}
object Five extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, Int](10)
b = new StreamTypeIntToDouble(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
这里有几个问题,第一个是最令人困惑的。 writeAll
在其上下文 F[_]
中是多态的,但它需要 F
(以及 Sync
)的 ContextShift
实例。您当前在范围内没有 ContextShift[IO]
,因此编译器不会推断 writeAll
的 F
应该是 IO
。如果你添加这样的东西:
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
…然后编译器将按您的预期推断出 IO
。
我对这种情况的建议是跳过类型推断。用类型参数写出来只是略微更冗长:
.through(
io.file
.writeAll[IO](Paths.get("file.txt"), blocker)
)
…这意味着您会收到有用的错误消息,例如缺少类型 class 实例。
一旦您解决了这个问题,还有其他几个问题。接下来是在这种情况下使用 evalMap
意味着您将拥有一个 ()
值流。如果将其更改为 evalTap
,日志记录副作用仍会适当发生,但不会丢失调用它的流的实际值。
最后一个问题是 writeAll
需要字节流,而您给它的是 Int
字节流。你想如何处理这种差异取决于预期的语义,但为了举例,像 .map(_.toByte)
这样的东西会让它编译。