fs2 - 与 2 个流共享 Ref
fs2 - Sharing a Ref with 2 streams
我正在尝试在 2 个并发流之间共享 Ref[F, A]
。以下是实际场景的简化示例。
class Container[F[_]](implicit F: Sync[F]) {
private val counter = Ref[F].of(0)
def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))
def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
}
在主函数中,
object MyApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val s = for {
container <- Ref[IO].of(new Container[IO]())
} yield {
val incrementBy2 = Stream.repeatEval(
container.get
.flatTap(c => c.incrementBy2)
.flatMap(c => container.update(_ => c))
)
.metered(2.second)
.interruptScope
val printStream = Stream.repeatEval(
container.get
.flatMap(_.printCounter)
)
.metered(1.seconds)
incrementBy2.concurrently(printStream)
}
Stream.eval(s)
.flatten
.compile
.drain
.as(ExitCode.Success)
}
}
incrementBy2
所做的更新在 printStream
中不可见。
我怎样才能解决这个问题?
如果能帮助我理解这段代码中的错误,我将不胜感激。
谢谢
您的代码自 class 定义以来已损坏,您甚至没有更新相同的 Ref
请记住,IO
的要点是对计算的描述,因此 Ref[F].of(0)
returns 一个程序,在计算时将创建一个新的 Ref
,在其上调用多个 flatMaps
将导致创建多个 Refs
。
此外,您没有以正确的方式进行无标记最终处理(有些人可能会争辩说即使是正确的方式也不值得:https://alexn.org/blog/2022/04/18/scala-oop-design-sample/)
这就是我编写代码的方式:
trait Counter {
def incrementBy2: IO[Unit]
def printCounter: IO[Unit]
}
object Counter {
val inMemory: IO[Counter] =
IO.ref(0).map { ref =>
new Counter {
override final val incrementBy2: IO[Unit] =
ref.update(c => c + 2)
override final val printCounter: IO[Unit] =
ref.get.flatMap(IO.println)
}
}
}
object Program {
def run(counter: Counter): Stream[IO, Unit] =
Stream
.repeatEval(counter.printCounter)
.metered(1.second)
.concurrently(
Stream.repeatEval(counter.incrementBy2).metered(2.seconds)
).interruptAfter(10.seconds)
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Stream
.eval(Counter.inMemory)
.flatMap(Program.run)
.compile
.drain
}
PS: I would actually not have printCounter
but getCounter
and do the printing in the Program
可以看到代码运行 here.
我正在尝试在 2 个并发流之间共享 Ref[F, A]
。以下是实际场景的简化示例。
class Container[F[_]](implicit F: Sync[F]) {
private val counter = Ref[F].of(0)
def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))
def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
}
在主函数中,
object MyApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val s = for {
container <- Ref[IO].of(new Container[IO]())
} yield {
val incrementBy2 = Stream.repeatEval(
container.get
.flatTap(c => c.incrementBy2)
.flatMap(c => container.update(_ => c))
)
.metered(2.second)
.interruptScope
val printStream = Stream.repeatEval(
container.get
.flatMap(_.printCounter)
)
.metered(1.seconds)
incrementBy2.concurrently(printStream)
}
Stream.eval(s)
.flatten
.compile
.drain
.as(ExitCode.Success)
}
}
incrementBy2
所做的更新在 printStream
中不可见。
我怎样才能解决这个问题?
如果能帮助我理解这段代码中的错误,我将不胜感激。
谢谢
您的代码自 class 定义以来已损坏,您甚至没有更新相同的 Ref
请记住,IO
的要点是对计算的描述,因此 Ref[F].of(0)
returns 一个程序,在计算时将创建一个新的 Ref
,在其上调用多个 flatMaps
将导致创建多个 Refs
。
此外,您没有以正确的方式进行无标记最终处理(有些人可能会争辩说即使是正确的方式也不值得:https://alexn.org/blog/2022/04/18/scala-oop-design-sample/)
这就是我编写代码的方式:
trait Counter {
def incrementBy2: IO[Unit]
def printCounter: IO[Unit]
}
object Counter {
val inMemory: IO[Counter] =
IO.ref(0).map { ref =>
new Counter {
override final val incrementBy2: IO[Unit] =
ref.update(c => c + 2)
override final val printCounter: IO[Unit] =
ref.get.flatMap(IO.println)
}
}
}
object Program {
def run(counter: Counter): Stream[IO, Unit] =
Stream
.repeatEval(counter.printCounter)
.metered(1.second)
.concurrently(
Stream.repeatEval(counter.incrementBy2).metered(2.seconds)
).interruptAfter(10.seconds)
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Stream
.eval(Counter.inMemory)
.flatMap(Program.run)
.compile
.drain
}
PS: I would actually not have
printCounter
butgetCounter
and do the printing in theProgram
可以看到代码运行 here.