如果另一个任务失败,如何组合两个并行任务以取消一个任务?

How to compose two parallel Tasks to cancel one task if another one fails?

我想用 实现我的异步处理 scalaz.concurrent.Task。我需要一个函数 (Task[A], Task[B]) => Task[(A, B)] 到 return 一个新任务,其工作方式如下:

你会如何实现这样的功能?

java 开发人员有一个奇怪的想法,即您不应该取消并行任务。他们 comminate Thread.stop() 并将其标记为已弃用。没有 Thread.stop() 你就无法真正取消未来。您所能做的就是向未来发送一些信号,或者修改一些共享变量并在未来内部编写代码以定期检查它。因此,所有提供 future 的图书馆都可以建议取消 future 的唯一方法:合作。

我现在面临同样的问题,正在为可以取消的期货编写自己的库。有一些困难,但可以解决。您只是不能在任意位置调用 Thread.stop() 。线程可以执行更新共享变量。锁会被正常召回,但更新可能会中途停止,例如只更新 double 值的一半等等。所以我要介绍一些锁。如果线程处于受保护状态,那么它现在应该被 Thread.stop() 杀死,但会发送特定消息。守卫状态被认为总是非常快等待。所有其他时间,在计算过程中,线程可以安全地停止并替换为新线程。

所以,答案是:你不应该希望取消期货,否则你就是异端,java 社区中没有人愿意帮助你。你应该定义你自己的可以杀死线程的执行上下文,你应该在这个上下文

上将你自己的期货库写到运行

正如我上面提到的,如果你不关心实际停止未失败的计算,你可以使用 Nondeterminism。例如:

import scalaz._, scalaz.Scalaz._, scalaz.concurrent._

def pairFailSlow[A, B](a: Task[A], b: Task[B]): Task[(A, B)] = a.tuple(b)

def pairFailFast[A, B](a: Task[A], b: Task[B]): Task[(A, B)] =
  Nondeterminism[Task].both(a, b)

val divByZero: Task[Int] = Task(1 / 0)
val waitALongTime: Task[String] = Task {
  Thread.sleep(10000)
  println("foo")
  "foo"
}

然后:

pairFailSlow(divByZero, waitALongTime).run // fails immediately
pairFailSlow(waitALongTime, divByZero).run // hangs while sleeping
pairFailFast(divByZero, waitALongTime).run // fails immediately
pairFailFast(waitALongTime, divByZero).run // fails immediately

除第一种情况外,waitALongTime 中的副作用都会发生。如果您想尝试停止该计算,则需要使用类似 TaskrunAsyncInterruptibly.