了解 cats.effect.Concurrent 关于取消
Understanding cats.effect.Concurrent with respect to Cancellation
鉴于:
build.sbt
scalaVersion := "2.13.2"
libraryDependencies += "org.typelevel" %% "cats-effect" % "2.1.3"
src/main/scala/net/Main.scala
package net
import cats.effect._
import cats.implicits._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
object App extends IOApp { self: IOApp =>
override def run(args: List[String]): IO[ExitCode] =
for {
_ <- uncancellable
_ <- notUncancellable
} yield ExitCode.Success
private def uncancellable: IO[Unit] = {
val tick: IO[Unit] = Concurrent[IO].uncancelable(self.timer.sleep(10.seconds))
for {
_ <- IO(println("uncancellable"))
fiber <- Concurrent[IO].start(tick)
_ <- IO(println("seconds begin: " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
_ <- fiber.cancel
_ <- fiber.join
_ <- IO(println("seconds done : " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
} yield ()
}
private def notUncancellable: IO[Unit] = {
val tick: IO[Unit] = self.timer.sleep(10.seconds)
for {
_ <- IO(println("notUncancellable"))
fiber <- Concurrent[IO].start(tick)
_ <- IO(println("seconds begin: " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
_ <- fiber.cancel
_ <- fiber.join
_ <- IO(println("seconds done : " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
} yield ()
}
}
运行 它显示以下输出:
sbt:cats-effect-cancellation-question> run
[info] Compiling 1 Scala source to /Users/kevinmeredith/Workspace/cats-effect-cancellation-questions/target/scala-2.13/classes ...
[info] Done compiling.
[info] Packaging /Users/kevinmeredith/Workspace/cats-effect-cancellation-questions/target/scala-2.13/cats-effect-cancellation-question_2.13-0.1.jar ...
[info] Done packaging.
[info] Running net.App
uncancellable
seconds begin: 303045
seconds done : 303055
notUncancellable
seconds begin: 303055
^C$
请注意,大约 30 秒后,我取消了它。
为什么 "seconds done :
没有打印出来:
notUncancellable
seconds begin: 303055
^C$
?
我相信 uncancellable
是不言自明的。
在不可取消的情况下,您的情况类似于 this GitHub issue。
正如 Alexandru Nedelcu 所说:
fiber.cancel
makes fiber.join
to be non-terminating in the case of IO. Therefore fiber.join
will never complete and that guarantee never gets a chance to be evaluated.
You can force an evaluation if you cancel that too, which in a real app you'd need to do if you cared for the result of that fiber.join
.
据我所知,这是对 the contract
的一种可能解释
/**
* Returns a new task that will await for the completion of the
* underlying fiber, (asynchronously) blocking the current run-loop
* until that result is available.
*/
def join: F[A]
取消的光纤不能 return 成功值 - 这是显而易见的。但是,如果它 returned 另一个失败的异常...它也会 returned 一个值,它可以被认为是 fiber 计算的值——它不应该 return 任何值,因为它被取消了!
出于这个原因,在这种情况下,您的整个线程都在等待一个从未到达的值。
为了避免此类陷阱,您可以使用不那么“低级”的内容,例如 racePair
或类似的东西,这样可以避免您自己处理此类问题。您可以阅读 Oleg Pyzhcov 关于 fiber safety.
的简短 post
鉴于:
build.sbt
scalaVersion := "2.13.2"
libraryDependencies += "org.typelevel" %% "cats-effect" % "2.1.3"
src/main/scala/net/Main.scala
package net
import cats.effect._
import cats.implicits._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
object App extends IOApp { self: IOApp =>
override def run(args: List[String]): IO[ExitCode] =
for {
_ <- uncancellable
_ <- notUncancellable
} yield ExitCode.Success
private def uncancellable: IO[Unit] = {
val tick: IO[Unit] = Concurrent[IO].uncancelable(self.timer.sleep(10.seconds))
for {
_ <- IO(println("uncancellable"))
fiber <- Concurrent[IO].start(tick)
_ <- IO(println("seconds begin: " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
_ <- fiber.cancel
_ <- fiber.join
_ <- IO(println("seconds done : " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
} yield ()
}
private def notUncancellable: IO[Unit] = {
val tick: IO[Unit] = self.timer.sleep(10.seconds)
for {
_ <- IO(println("notUncancellable"))
fiber <- Concurrent[IO].start(tick)
_ <- IO(println("seconds begin: " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
_ <- fiber.cancel
_ <- fiber.join
_ <- IO(println("seconds done : " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
} yield ()
}
}
运行 它显示以下输出:
sbt:cats-effect-cancellation-question> run
[info] Compiling 1 Scala source to /Users/kevinmeredith/Workspace/cats-effect-cancellation-questions/target/scala-2.13/classes ...
[info] Done compiling.
[info] Packaging /Users/kevinmeredith/Workspace/cats-effect-cancellation-questions/target/scala-2.13/cats-effect-cancellation-question_2.13-0.1.jar ...
[info] Done packaging.
[info] Running net.App
uncancellable
seconds begin: 303045
seconds done : 303055
notUncancellable
seconds begin: 303055
^C$
请注意,大约 30 秒后,我取消了它。
为什么 "seconds done :
没有打印出来:
notUncancellable
seconds begin: 303055
^C$
?
uncancellable
是不言自明的。
在不可取消的情况下,您的情况类似于 this GitHub issue。
正如 Alexandru Nedelcu 所说:
fiber.cancel
makesfiber.join
to be non-terminating in the case of IO. Thereforefiber.join
will never complete and that guarantee never gets a chance to be evaluated.You can force an evaluation if you cancel that too, which in a real app you'd need to do if you cared for the result of that
fiber.join
.
据我所知,这是对 the contract
的一种可能解释 /**
* Returns a new task that will await for the completion of the
* underlying fiber, (asynchronously) blocking the current run-loop
* until that result is available.
*/
def join: F[A]
取消的光纤不能 return 成功值 - 这是显而易见的。但是,如果它 returned 另一个失败的异常...它也会 returned 一个值,它可以被认为是 fiber 计算的值——它不应该 return 任何值,因为它被取消了!
出于这个原因,在这种情况下,您的整个线程都在等待一个从未到达的值。
为了避免此类陷阱,您可以使用不那么“低级”的内容,例如 racePair
或类似的东西,这样可以避免您自己处理此类问题。您可以阅读 Oleg Pyzhcov 关于 fiber safety.