fs2 - 与 2 个流共享 Ref

fs2 - Sharing a Ref with 2 streams

我正在尝试在 2 个并发流之间共享 Ref[F, A]。以下是实际场景的简化示例。

  class Container[F[_]](implicit F: Sync[F]) {
    private val counter = Ref[F].of(0)

    def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))

    def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
  }

在主函数中,

object MyApp extends IOApp {

  def run(args: List[String]): IO[ExitCode] = {
    val s = for {
      container <- Ref[IO].of(new Container[IO]())
    } yield {
      val incrementBy2 = Stream.repeatEval(
          container.get
            .flatTap(c => c.incrementBy2)
            .flatMap(c => container.update(_ => c))
        )
        .metered(2.second)
        .interruptScope

      val printStream = Stream.repeatEval(
          container.get
            .flatMap(_.printCounter)
        )
        .metered(1.seconds)

      incrementBy2.concurrently(printStream)
    }
    Stream.eval(s)
      .flatten
      .compile
      .drain
      .as(ExitCode.Success)
  }
}

incrementBy2 所做的更新在 printStream 中不可见。 我怎样才能解决这个问题? 如果能帮助我理解这段代码中的错误,我将不胜感激。

谢谢

您的代码自 class 定义以来已损坏,您甚至没有更新相同的 Ref

请记住,IO 的要点是对计算的描述,因此 Ref[F].of(0) returns 一个程序,在计算时将创建一个新的 Ref,在其上调用多个 flatMaps 将导致创建多个 Refs

此外,您没有以正确的方式进行无标记最终处理(有些人可能会争辩说即使是正确的方式也不值得:https://alexn.org/blog/2022/04/18/scala-oop-design-sample/

这就是我编写代码的方式:

trait Counter {
  def incrementBy2: IO[Unit]
  def printCounter: IO[Unit]
}
object Counter {
  val inMemory: IO[Counter] =
    IO.ref(0).map { ref =>
      new Counter {
        override final val incrementBy2: IO[Unit] =
          ref.update(c => c + 2)
        
        override final val printCounter: IO[Unit] =
          ref.get.flatMap(IO.println)
      }
    }
}

object Program {
  def run(counter: Counter): Stream[IO, Unit] =
    Stream
      .repeatEval(counter.printCounter)
      .metered(1.second)
      .concurrently(
        Stream.repeatEval(counter.incrementBy2).metered(2.seconds)
      ).interruptAfter(10.seconds)
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    Stream
      .eval(Counter.inMemory)
      .flatMap(Program.run)
      .compile
      .drain
}

PS: I would actually not have printCounter but getCounter and do the printing in the Program


可以看到代码运行 here.