由 mvar 引起的竞争条件

Race condition caused by mvar

考虑以下简单示例:

implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val mvarF = MVar.of[IO, mutable.Map[Int, Int]](mutable.Map.empty)

mvarF.flatMap(mvar =>

  mvar.take.bracket(st => {
    IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start
  })(mvar.put)
    >>
  mvar.take.bracket(st =>
    IO(println(s"Size before sleep ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after sleep ${st.size}"))
  )(mvar.put)

).unsafeRunSync()

它打印:

Size before sleep 1
Size after sleep 0

在这个例子中,在fiber下调度的作业修改了mvar下的对象,该对象被另一个作业获取。

这非常不安全。有没有办法禁止这种用法?

正如 Jasper 所指出的,您的主要问题(对于此特定代码示例)是您在通过调用 IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).startuse 中启动新光纤后释放 bracket .所以你的使用实际上是一个 mutable.Map[Int, Int] => IO[Fiber[IO, Unit]].

您只需要删除那个 start 并且您将有预期的行为(您的使用将是 mutable.Map[Int, Int] => IO[Unit] 并且 bracket 不会被释放,除非您的 use IO 完成。)。这意味着地图对于两个打印操作都将是空的。

mvarF.flatMap(mvar =>
  mvar.take.bracket(st => {
    IO(st.put(1, 1)) >>
      IO.sleep(2.seconds) >>
        IO(st.clear())
  })(mvar.put)
    >>
    mvar.take.bracket(st =>
      IO(println(s"Size before sleep ${st.size}")) >>
        IO.sleep(2.seconds) >>
          IO(println(s"Size after sleep ${st.size}"))
    )(mvar.put)

).unsafeRunSync()
Size before sleep 0
Size after sleep 0

但这实际上只是这个特定代码示例的巧合(IO 与 flatMap 链接在一起,这意味着我们告诉运行时按顺序执行这些 IO)。

MVar 让您可以控制变量的重新分配,但您根本没有进行任何重新分配。因此,这段代码甚至没有使用 MVar 的任何功能,它只是作为旁观者坐在那里。

因此,以这种方式使用 MVar 对代码的线程安全性影响为零。

mvarF.flatMap(mvar =>
  mvar.take.bracket(st =>
    IO(println(s"Size before first sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${st.size}"))
  )(mvar.put)
).unsafeRunAsyncAndForget()

mvarF.flatMap(mvar =>
  mvar.take.bracket(st => {
    IO(st.put(1, 1)) >> IO.sleep(2.seconds) >> IO(st.clear())
  })(mvar.put)
).unsafeRunAsyncAndForget()

mvarF.flatMap(mvar =>
    mvar.take.bracket(st =>
      IO(println(s"Size before second sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${st.size}"))
    )(mvar.put)
).unsafeRunAsyncAndForget()
Size before first sleep - 0
Size before second sleep - 1
Size after first sleep - 0
Size after second sleep - 0

您可以使用 Semaphore 获得无竞争范围。

class IOWithSemaphore[A](
    private val a: A, 
    private val semaphore: Semaphore[IO]
  )(
    implicit
    F: Concurrent[IO],
    T: Timer[IO]) {

  def unitUse(use: A => IO[Unit]): IO[Unit] =
    for {
      _ <- semaphore.acquire
      _ <- use(a)
      _ <- semaphore.release
    } yield ()

}
val map = mutable.Map.empty[Int, Int]

Semaphore[IO](1).map(semaphore => {
  val mapIOWithSemaphore = new IOWithSemaphore[mutable.Map[Int, Int]](map, semaphore)

  // using unsafeRunAsync to emulate the parallel usage

  mapIOWithSemaphore.unitUse(map =>
    IO(println(s"Size before first sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${map.size}"))
  ).unsafeRunAsyncAndForget()

  mapIOWithSemaphore.unitUse(map =>
    IO(println(s"MUTATION BEGIN")) >> IO(map.put(1, 1)) >> IO.sleep(2.seconds) >> IO(map.clear()) >> IO(println(s"MUTATION END"))
  ).unsafeRunAsyncAndForget()

  mapIOWithSemaphore.unitUse(map =>
    IO(println(s"Size before second sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${map.size}"))
  ).unsafeRunAsyncAndForget()

}).unsafeRunAsyncAndForget()

Await.result(Promise[Unit].future, Duration.Inf)
Size before first sleep - 0
Size after first sleep - 0
MUTATION BEGIN
MUTATION END
Size before second sleep - 0
Size after second sleep - 0