由 mvar 引起的竞争条件
Race condition caused by mvar
考虑以下简单示例:
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val mvarF = MVar.of[IO, mutable.Map[Int, Int]](mutable.Map.empty)
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start
})(mvar.put)
>>
mvar.take.bracket(st =>
IO(println(s"Size before sleep ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after sleep ${st.size}"))
)(mvar.put)
).unsafeRunSync()
它打印:
Size before sleep 1
Size after sleep 0
在这个例子中,在fiber下调度的作业修改了mvar下的对象,该对象被另一个作业获取。
这非常不安全。有没有办法禁止这种用法?
正如 Jasper 所指出的,您的主要问题(对于此特定代码示例)是您在通过调用 IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start
在 use
中启动新光纤后释放 bracket
.所以你的使用实际上是一个 mutable.Map[Int, Int] => IO[Fiber[IO, Unit]]
.
您只需要删除那个 start
并且您将有预期的行为(您的使用将是 mutable.Map[Int, Int] => IO[Unit]
并且 bracket
不会被释放,除非您的 use
IO
完成。)。这意味着地图对于两个打印操作都将是空的。
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >>
IO.sleep(2.seconds) >>
IO(st.clear())
})(mvar.put)
>>
mvar.take.bracket(st =>
IO(println(s"Size before sleep ${st.size}")) >>
IO.sleep(2.seconds) >>
IO(println(s"Size after sleep ${st.size}"))
)(mvar.put)
).unsafeRunSync()
Size before sleep 0
Size after sleep 0
但这实际上只是这个特定代码示例的巧合(IO 与 flatMap 链接在一起,这意味着我们告诉运行时按顺序执行这些 IO)。
MVar
让您可以控制变量的重新分配,但您根本没有进行任何重新分配。因此,这段代码甚至没有使用 MVar
的任何功能,它只是作为旁观者坐在那里。
因此,以这种方式使用 MVar
对代码的线程安全性影响为零。
mvarF.flatMap(mvar =>
mvar.take.bracket(st =>
IO(println(s"Size before first sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${st.size}"))
)(mvar.put)
).unsafeRunAsyncAndForget()
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >> IO.sleep(2.seconds) >> IO(st.clear())
})(mvar.put)
).unsafeRunAsyncAndForget()
mvarF.flatMap(mvar =>
mvar.take.bracket(st =>
IO(println(s"Size before second sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${st.size}"))
)(mvar.put)
).unsafeRunAsyncAndForget()
Size before first sleep - 0
Size before second sleep - 1
Size after first sleep - 0
Size after second sleep - 0
您可以使用 Semaphore 获得无竞争范围。
class IOWithSemaphore[A](
private val a: A,
private val semaphore: Semaphore[IO]
)(
implicit
F: Concurrent[IO],
T: Timer[IO]) {
def unitUse(use: A => IO[Unit]): IO[Unit] =
for {
_ <- semaphore.acquire
_ <- use(a)
_ <- semaphore.release
} yield ()
}
val map = mutable.Map.empty[Int, Int]
Semaphore[IO](1).map(semaphore => {
val mapIOWithSemaphore = new IOWithSemaphore[mutable.Map[Int, Int]](map, semaphore)
// using unsafeRunAsync to emulate the parallel usage
mapIOWithSemaphore.unitUse(map =>
IO(println(s"Size before first sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${map.size}"))
).unsafeRunAsyncAndForget()
mapIOWithSemaphore.unitUse(map =>
IO(println(s"MUTATION BEGIN")) >> IO(map.put(1, 1)) >> IO.sleep(2.seconds) >> IO(map.clear()) >> IO(println(s"MUTATION END"))
).unsafeRunAsyncAndForget()
mapIOWithSemaphore.unitUse(map =>
IO(println(s"Size before second sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${map.size}"))
).unsafeRunAsyncAndForget()
}).unsafeRunAsyncAndForget()
Await.result(Promise[Unit].future, Duration.Inf)
Size before first sleep - 0
Size after first sleep - 0
MUTATION BEGIN
MUTATION END
Size before second sleep - 0
Size after second sleep - 0
考虑以下简单示例:
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val mvarF = MVar.of[IO, mutable.Map[Int, Int]](mutable.Map.empty)
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start
})(mvar.put)
>>
mvar.take.bracket(st =>
IO(println(s"Size before sleep ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after sleep ${st.size}"))
)(mvar.put)
).unsafeRunSync()
它打印:
Size before sleep 1
Size after sleep 0
在这个例子中,在fiber下调度的作业修改了mvar下的对象,该对象被另一个作业获取。
这非常不安全。有没有办法禁止这种用法?
正如 Jasper 所指出的,您的主要问题(对于此特定代码示例)是您在通过调用 IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start
在 use
中启动新光纤后释放 bracket
.所以你的使用实际上是一个 mutable.Map[Int, Int] => IO[Fiber[IO, Unit]]
.
您只需要删除那个 start
并且您将有预期的行为(您的使用将是 mutable.Map[Int, Int] => IO[Unit]
并且 bracket
不会被释放,除非您的 use
IO
完成。)。这意味着地图对于两个打印操作都将是空的。
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >>
IO.sleep(2.seconds) >>
IO(st.clear())
})(mvar.put)
>>
mvar.take.bracket(st =>
IO(println(s"Size before sleep ${st.size}")) >>
IO.sleep(2.seconds) >>
IO(println(s"Size after sleep ${st.size}"))
)(mvar.put)
).unsafeRunSync()
Size before sleep 0
Size after sleep 0
但这实际上只是这个特定代码示例的巧合(IO 与 flatMap 链接在一起,这意味着我们告诉运行时按顺序执行这些 IO)。
MVar
让您可以控制变量的重新分配,但您根本没有进行任何重新分配。因此,这段代码甚至没有使用 MVar
的任何功能,它只是作为旁观者坐在那里。
因此,以这种方式使用 MVar
对代码的线程安全性影响为零。
mvarF.flatMap(mvar =>
mvar.take.bracket(st =>
IO(println(s"Size before first sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${st.size}"))
)(mvar.put)
).unsafeRunAsyncAndForget()
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >> IO.sleep(2.seconds) >> IO(st.clear())
})(mvar.put)
).unsafeRunAsyncAndForget()
mvarF.flatMap(mvar =>
mvar.take.bracket(st =>
IO(println(s"Size before second sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${st.size}"))
)(mvar.put)
).unsafeRunAsyncAndForget()
Size before first sleep - 0
Size before second sleep - 1
Size after first sleep - 0
Size after second sleep - 0
您可以使用 Semaphore 获得无竞争范围。
class IOWithSemaphore[A](
private val a: A,
private val semaphore: Semaphore[IO]
)(
implicit
F: Concurrent[IO],
T: Timer[IO]) {
def unitUse(use: A => IO[Unit]): IO[Unit] =
for {
_ <- semaphore.acquire
_ <- use(a)
_ <- semaphore.release
} yield ()
}
val map = mutable.Map.empty[Int, Int]
Semaphore[IO](1).map(semaphore => {
val mapIOWithSemaphore = new IOWithSemaphore[mutable.Map[Int, Int]](map, semaphore)
// using unsafeRunAsync to emulate the parallel usage
mapIOWithSemaphore.unitUse(map =>
IO(println(s"Size before first sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${map.size}"))
).unsafeRunAsyncAndForget()
mapIOWithSemaphore.unitUse(map =>
IO(println(s"MUTATION BEGIN")) >> IO(map.put(1, 1)) >> IO.sleep(2.seconds) >> IO(map.clear()) >> IO(println(s"MUTATION END"))
).unsafeRunAsyncAndForget()
mapIOWithSemaphore.unitUse(map =>
IO(println(s"Size before second sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${map.size}"))
).unsafeRunAsyncAndForget()
}).unsafeRunAsyncAndForget()
Await.result(Promise[Unit].future, Duration.Inf)
Size before first sleep - 0
Size after first sleep - 0
MUTATION BEGIN
MUTATION END
Size before second sleep - 0
Size after second sleep - 0