参考更新和使用猫效应的光纤触发器

Ref Updates And Fiber Triggers Using Cats Effect

问题: 我正在尝试解决我需要每 x 分钟安排一次的问题,我需要更新缓存并且可以并发获取.

尝试过的解决方案:

  1. 使用带有猫效果的 TrieMap 和 ScheduledThreadPool 执行器:

我实际上是从使用 TrieMap 开始的,因为它提供线程安全并使用调度线程池来调度更新

import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}

import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

object ExploreTrieMap extends IOApp {
  def callForEvery[A](f: => Unit, d: FiniteDuration)
                     (implicit sc: ScheduledExecutorService): IO[Unit] = {
    IO.cancelable {
      cb =>
        val r = new Runnable {
          override def run(): Unit = cb(Right(f))
        }
        val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
        IO(scFut.cancel(false)).void
    }
  }

  val map = TrieMap.empty[String, String]
  override def run(args: List[String]): IO[ExitCode] = {
    implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
    for {
      _ <- callForEvery(println(map.get("token")), 1 second)
      _ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
    } yield ExitCode.Success
  }
}

  1. 使用 Ref 和猫效应纤维:

然后创建了一个纯猫效应解决方案。

下面这段代码会导致 Whosebug 错误吗?

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO, String]("")
      s <- scheduleAndPopulate(ref, 1 minute)
      r <- keepPollingUsingFiber(ref)
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      rS <- scheduleAndPopulate(r, duration)(cs)
      _ <- rS.join
    } yield ()).start(cs)
  }


  def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingFiber(r)(cs)
      _ <- w.join
    } yield ()).start(cs)
  }
}

我正在尝试更新 Ref,并将 Ref 用作由另一个光纤更新的并发缓存。我正在使用递归触发纤程创建。我知道纤维可用于堆栈安全操作。在这种情况下,我将加入创建的旧光纤。所以想了解以下代码是否安全。

更新(下面提供的答案的解决方案)

第三种解决方案:基于其中一个答案的输入。与其为每个递归调用分叉,不如将其分叉给调用者。

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

object ExploreCatFiberWithIO extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO, String]("")
      s <- scheduleAndPopulateWithIO(ref, 1 second).start
      r <- keepPollingUsingIO(ref).start
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
    for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulateWithIO(r, duration)(cs)
    } yield ()
  }

  def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingIO(r)(cs)
    } yield ())
  }
}

很想知道上述方法的优缺点。

对于第二种方法,您可以通过不在 scheduleAndPopulatekeepPollingUsingFiber 中分叉 Fiber 来使其更简单。相反,保留递归调用,并将它们分叉到调用者中。 IO 是堆栈安全的,因此递归调用不会炸毁堆栈。

您可以使用 start 分叉每个,但 parTupled 它们可能更简单。它是 parMapN 的变体,它分叉每个效果并收集它们的结果。

(此外,在您的代码中您不需要传递隐式值,例如 cs,编译器会显式地为您推断它们。)

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO, String]("")
      _ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulate(r, duration)
    } yield ()
  }

  def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      _ <- keepPollingUsingFiber(r)
    } yield ()
  }
}