所有 Scala 期货已完成
All Scala futures completed
我有一个 akka actor,其中我在 scala 中有几个 futures,它们只做一些额外的工作并将计算保存到共享变量中。主要工作完成后,我想等待这些 'extra' 个期货并回复一个答案。简单的代码是这样的:
....
val replyTo = sender()
var informationCollector = InformationCollector()
val mainFuture: Future[GetMainInformation] = ...
val aFuture: Future[GetExtraInformationA] =...
val bFuture: Future[GetExtraInformationB] =...
val cFuture: Future[GetExtraInformationC] =...
// now I want all the futures to write their result
// and save it into the informationCollector and then
// after all have executed, reply back to Sender all the collected Data.
使用这种方法,我遇到了 2 个问题。我不确定在期货的 onSuccess 调用中访问 InformationCollector 是否安全,我也想让它易于扩展(因此可以轻松添加其他收集器)。
直到现在,我想出了这个解决方案,但我不确定它是否正确
// IMPORTANT: firstly add map for all of the extraInfoFutures to add that information into collector
// this will create aFutureMapped, bFutureMapped, cFutureMapped
roomFuture onComplete {
case Success(mainInfo: GetMainInformation) => {
informationCollector = informationCollector.copy(mainInformation=mainInfo)
Future.sequence(List(aFutureMapped,bFutureMapped,cFutureMapped)) onComplete { _ =>
replyTo ! informationCollector
}
}
}
非常感谢任何帮助。
我不确定我是否完全理解你的问题,但我至少可以为第一个问题提供答案。
访问 onComplete
中的可变状态可能不安全。实际上,它使演员的全部目的无效。如果你访问可变状态,你应该总是通过参与者的邮箱来访问,例如self ! ModifyState(newValue)
.
这里的问题是 onComplete
方法与 actor 不在同一个线程中,这导致了两个线程可能同时修改相同数据的情况,因此改变状态具有与那里相同的所有问题通常处于状态的并发突变(竞争条件等)。
对于第二期,我只是通过self ! TypeAExtraValue(someValue)
从不同类型的额外期货中收集价值,然后在收到所有额外价值后发送收集器(例如TypeAInfoCollector
)。正确使用 actor 可以解决这个问题,因为它们一次只处理一条消息。
代码可能如下所示:
val extraInfoFuture = getExtraInfo()
val infoCollector = new Collector()
extraInfoFuture onComplete {
case Success(extraInfo) => self ! ExtraInfo(extraInfo)
}
...
def receive = {
case ExtraInfo(info) => {
infoCollector.collect(info)
if(infoCollector.collectedAll) sender() ! infoCollector
}
}
我有一个 akka actor,其中我在 scala 中有几个 futures,它们只做一些额外的工作并将计算保存到共享变量中。主要工作完成后,我想等待这些 'extra' 个期货并回复一个答案。简单的代码是这样的:
....
val replyTo = sender()
var informationCollector = InformationCollector()
val mainFuture: Future[GetMainInformation] = ...
val aFuture: Future[GetExtraInformationA] =...
val bFuture: Future[GetExtraInformationB] =...
val cFuture: Future[GetExtraInformationC] =...
// now I want all the futures to write their result
// and save it into the informationCollector and then
// after all have executed, reply back to Sender all the collected Data.
使用这种方法,我遇到了 2 个问题。我不确定在期货的 onSuccess 调用中访问 InformationCollector 是否安全,我也想让它易于扩展(因此可以轻松添加其他收集器)。
直到现在,我想出了这个解决方案,但我不确定它是否正确
// IMPORTANT: firstly add map for all of the extraInfoFutures to add that information into collector
// this will create aFutureMapped, bFutureMapped, cFutureMapped
roomFuture onComplete {
case Success(mainInfo: GetMainInformation) => {
informationCollector = informationCollector.copy(mainInformation=mainInfo)
Future.sequence(List(aFutureMapped,bFutureMapped,cFutureMapped)) onComplete { _ =>
replyTo ! informationCollector
}
}
}
非常感谢任何帮助。
我不确定我是否完全理解你的问题,但我至少可以为第一个问题提供答案。
访问 onComplete
中的可变状态可能不安全。实际上,它使演员的全部目的无效。如果你访问可变状态,你应该总是通过参与者的邮箱来访问,例如self ! ModifyState(newValue)
.
这里的问题是 onComplete
方法与 actor 不在同一个线程中,这导致了两个线程可能同时修改相同数据的情况,因此改变状态具有与那里相同的所有问题通常处于状态的并发突变(竞争条件等)。
对于第二期,我只是通过self ! TypeAExtraValue(someValue)
从不同类型的额外期货中收集价值,然后在收到所有额外价值后发送收集器(例如TypeAInfoCollector
)。正确使用 actor 可以解决这个问题,因为它们一次只处理一条消息。
代码可能如下所示:
val extraInfoFuture = getExtraInfo()
val infoCollector = new Collector()
extraInfoFuture onComplete {
case Success(extraInfo) => self ! ExtraInfo(extraInfo)
}
...
def receive = {
case ExtraInfo(info) => {
infoCollector.collect(info)
if(infoCollector.collectedAll) sender() ! infoCollector
}
}