如果另一个任务失败,如何组合两个并行任务以取消一个任务?
How to compose two parallel Tasks to cancel one task if another one fails?
我想用 实现我的异步处理
scalaz.concurrent.Task。我需要一个函数 (Task[A], Task[B]) => Task[(A, B)]
到 return 一个新任务,其工作方式如下:
- 运行
Task[A]
和Task[B]
并行等待结果;
- 如果其中一项任务失败,则取消第二个任务并等待它终止;
- return两项任务的结果。
你会如何实现这样的功能?
java 开发人员有一个奇怪的想法,即您不应该取消并行任务。他们 comminate Thread.stop()
并将其标记为已弃用。没有 Thread.stop()
你就无法真正取消未来。您所能做的就是向未来发送一些信号,或者修改一些共享变量并在未来内部编写代码以定期检查它。因此,所有提供 future 的图书馆都可以建议取消 future 的唯一方法:合作。
我现在面临同样的问题,正在为可以取消的期货编写自己的库。有一些困难,但可以解决。您只是不能在任意位置调用 Thread.stop() 。线程可以执行更新共享变量。锁会被正常召回,但更新可能会中途停止,例如只更新 double 值的一半等等。所以我要介绍一些锁。如果线程处于受保护状态,那么它现在应该被 Thread.stop() 杀死,但会发送特定消息。守卫状态被认为总是非常快等待。所有其他时间,在计算过程中,线程可以安全地停止并替换为新线程。
所以,答案是:你不应该希望取消期货,否则你就是异端,java 社区中没有人愿意帮助你。你应该定义你自己的可以杀死线程的执行上下文,你应该在这个上下文
上将你自己的期货库写到运行
正如我上面提到的,如果你不关心实际停止未失败的计算,你可以使用 Nondeterminism
。例如:
import scalaz._, scalaz.Scalaz._, scalaz.concurrent._
def pairFailSlow[A, B](a: Task[A], b: Task[B]): Task[(A, B)] = a.tuple(b)
def pairFailFast[A, B](a: Task[A], b: Task[B]): Task[(A, B)] =
Nondeterminism[Task].both(a, b)
val divByZero: Task[Int] = Task(1 / 0)
val waitALongTime: Task[String] = Task {
Thread.sleep(10000)
println("foo")
"foo"
}
然后:
pairFailSlow(divByZero, waitALongTime).run // fails immediately
pairFailSlow(waitALongTime, divByZero).run // hangs while sleeping
pairFailFast(divByZero, waitALongTime).run // fails immediately
pairFailFast(waitALongTime, divByZero).run // fails immediately
除第一种情况外,waitALongTime
中的副作用都会发生。如果您想尝试停止该计算,则需要使用类似 Task
的 runAsyncInterruptibly
.
我想用 实现我的异步处理
scalaz.concurrent.Task。我需要一个函数 (Task[A], Task[B]) => Task[(A, B)]
到 return 一个新任务,其工作方式如下:
- 运行
Task[A]
和Task[B]
并行等待结果; - 如果其中一项任务失败,则取消第二个任务并等待它终止;
- return两项任务的结果。
你会如何实现这样的功能?
java 开发人员有一个奇怪的想法,即您不应该取消并行任务。他们 comminate Thread.stop()
并将其标记为已弃用。没有 Thread.stop()
你就无法真正取消未来。您所能做的就是向未来发送一些信号,或者修改一些共享变量并在未来内部编写代码以定期检查它。因此,所有提供 future 的图书馆都可以建议取消 future 的唯一方法:合作。
我现在面临同样的问题,正在为可以取消的期货编写自己的库。有一些困难,但可以解决。您只是不能在任意位置调用 Thread.stop() 。线程可以执行更新共享变量。锁会被正常召回,但更新可能会中途停止,例如只更新 double 值的一半等等。所以我要介绍一些锁。如果线程处于受保护状态,那么它现在应该被 Thread.stop() 杀死,但会发送特定消息。守卫状态被认为总是非常快等待。所有其他时间,在计算过程中,线程可以安全地停止并替换为新线程。
所以,答案是:你不应该希望取消期货,否则你就是异端,java 社区中没有人愿意帮助你。你应该定义你自己的可以杀死线程的执行上下文,你应该在这个上下文
上将你自己的期货库写到运行正如我上面提到的,如果你不关心实际停止未失败的计算,你可以使用 Nondeterminism
。例如:
import scalaz._, scalaz.Scalaz._, scalaz.concurrent._
def pairFailSlow[A, B](a: Task[A], b: Task[B]): Task[(A, B)] = a.tuple(b)
def pairFailFast[A, B](a: Task[A], b: Task[B]): Task[(A, B)] =
Nondeterminism[Task].both(a, b)
val divByZero: Task[Int] = Task(1 / 0)
val waitALongTime: Task[String] = Task {
Thread.sleep(10000)
println("foo")
"foo"
}
然后:
pairFailSlow(divByZero, waitALongTime).run // fails immediately
pairFailSlow(waitALongTime, divByZero).run // hangs while sleeping
pairFailFast(divByZero, waitALongTime).run // fails immediately
pairFailFast(waitALongTime, divByZero).run // fails immediately
除第一种情况外,waitALongTime
中的副作用都会发生。如果您想尝试停止该计算,则需要使用类似 Task
的 runAsyncInterruptibly
.