参考更新和使用猫效应的光纤触发器
Ref Updates And Fiber Triggers Using Cats Effect
问题: 我正在尝试解决我需要每 x 分钟安排一次的问题,我需要更新缓存并且可以并发获取.
尝试过的解决方案:
- 使用带有猫效果的 TrieMap 和 ScheduledThreadPool 执行器:
我实际上是从使用 TrieMap 开始的,因为它提供线程安全并使用调度线程池来调度更新
import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}
import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreTrieMap extends IOApp {
def callForEvery[A](f: => Unit, d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable {
cb =>
val r = new Runnable {
override def run(): Unit = cb(Right(f))
}
val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
IO(scFut.cancel(false)).void
}
}
val map = TrieMap.empty[String, String]
override def run(args: List[String]): IO[ExitCode] = {
implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
for {
_ <- callForEvery(println(map.get("token")), 1 second)
_ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
} yield ExitCode.Success
}
}
- 使用 Ref 和猫效应纤维:
然后创建了一个纯猫效应解决方案。
下面这段代码会导致 Whosebug 错误吗?
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulate(ref, 1 minute)
r <- keepPollingUsingFiber(ref)
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
rS <- scheduleAndPopulate(r, duration)(cs)
_ <- rS.join
} yield ()).start(cs)
}
def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingFiber(r)(cs)
_ <- w.join
} yield ()).start(cs)
}
}
我正在尝试更新 Ref,并将 Ref 用作由另一个光纤更新的并发缓存。我正在使用递归触发纤程创建。我知道纤维可用于堆栈安全操作。在这种情况下,我将加入创建的旧光纤。所以想了解以下代码是否安全。
更新(下面提供的答案的解决方案)
第三种解决方案:基于其中一个答案的输入。与其为每个递归调用分叉,不如将其分叉给调用者。
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiberWithIO extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulateWithIO(ref, 1 second).start
r <- keepPollingUsingIO(ref).start
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulateWithIO(r, duration)(cs)
} yield ()
}
def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingIO(r)(cs)
} yield ())
}
}
很想知道上述方法的优缺点。
对于第二种方法,您可以通过不在 scheduleAndPopulate
和 keepPollingUsingFiber
中分叉 Fiber
来使其更简单。相反,保留递归调用,并将它们分叉到调用者中。 IO
是堆栈安全的,因此递归调用不会炸毁堆栈。
您可以使用 start
分叉每个,但 parTupled
它们可能更简单。它是 parMapN
的变体,它分叉每个效果并收集它们的结果。
(此外,在您的代码中您不需要传递隐式值,例如 cs
,编译器会显式地为您推断它们。)
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
_ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulate(r, duration)
} yield ()
}
def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
_ <- keepPollingUsingFiber(r)
} yield ()
}
}
问题: 我正在尝试解决我需要每 x 分钟安排一次的问题,我需要更新缓存并且可以并发获取.
尝试过的解决方案:
- 使用带有猫效果的 TrieMap 和 ScheduledThreadPool 执行器:
我实际上是从使用 TrieMap 开始的,因为它提供线程安全并使用调度线程池来调度更新
import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}
import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreTrieMap extends IOApp {
def callForEvery[A](f: => Unit, d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable {
cb =>
val r = new Runnable {
override def run(): Unit = cb(Right(f))
}
val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
IO(scFut.cancel(false)).void
}
}
val map = TrieMap.empty[String, String]
override def run(args: List[String]): IO[ExitCode] = {
implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
for {
_ <- callForEvery(println(map.get("token")), 1 second)
_ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
} yield ExitCode.Success
}
}
- 使用 Ref 和猫效应纤维:
然后创建了一个纯猫效应解决方案。
下面这段代码会导致 Whosebug 错误吗?
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulate(ref, 1 minute)
r <- keepPollingUsingFiber(ref)
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
rS <- scheduleAndPopulate(r, duration)(cs)
_ <- rS.join
} yield ()).start(cs)
}
def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingFiber(r)(cs)
_ <- w.join
} yield ()).start(cs)
}
}
我正在尝试更新 Ref,并将 Ref 用作由另一个光纤更新的并发缓存。我正在使用递归触发纤程创建。我知道纤维可用于堆栈安全操作。在这种情况下,我将加入创建的旧光纤。所以想了解以下代码是否安全。
更新(下面提供的答案的解决方案)
第三种解决方案:基于其中一个答案的输入。与其为每个递归调用分叉,不如将其分叉给调用者。
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiberWithIO extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulateWithIO(ref, 1 second).start
r <- keepPollingUsingIO(ref).start
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulateWithIO(r, duration)(cs)
} yield ()
}
def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingIO(r)(cs)
} yield ())
}
}
很想知道上述方法的优缺点。
对于第二种方法,您可以通过不在 scheduleAndPopulate
和 keepPollingUsingFiber
中分叉 Fiber
来使其更简单。相反,保留递归调用,并将它们分叉到调用者中。 IO
是堆栈安全的,因此递归调用不会炸毁堆栈。
您可以使用 start
分叉每个,但 parTupled
它们可能更简单。它是 parMapN
的变体,它分叉每个效果并收集它们的结果。
(此外,在您的代码中您不需要传递隐式值,例如 cs
,编译器会显式地为您推断它们。)
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
_ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulate(r, duration)
} yield ()
}
def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
_ <- keepPollingUsingFiber(r)
} yield ()
}
}