所有 Scala 期货已完成

All Scala futures completed

我有一个 akka actor,其中我在 scala 中有几个 futures,它们只做一些额外的工作并将计算保存到共享变量中。主要工作完成后,我想等待这些 'extra' 个期货并回复一个答案。简单的代码是这样的:

....
val replyTo = sender()
var informationCollector = InformationCollector()
val mainFuture: Future[GetMainInformation] = ...
val aFuture: Future[GetExtraInformationA] =...
val bFuture: Future[GetExtraInformationB] =...
val cFuture: Future[GetExtraInformationC] =...

// now I want all the futures to write their result
// and save it into the informationCollector and then
// after all have executed, reply back to Sender all the collected Data.

使用这种方法,我遇到了 2 个问题。我不确定在期货的 onSuccess 调用中访问 InformationCollector 是否安全,我也想让它易于扩展(因此可以轻松添加其他收集器)。

直到现在,我想出了这个解决方案,但我不确定它是否正确

// IMPORTANT: firstly add map for all of the extraInfoFutures to add that information into collector
// this will create aFutureMapped, bFutureMapped, cFutureMapped
roomFuture onComplete {
      case Success(mainInfo: GetMainInformation) => {
           informationCollector = informationCollector.copy(mainInformation=mainInfo)
           Future.sequence(List(aFutureMapped,bFutureMapped,cFutureMapped)) onComplete { _ =>
               replyTo ! informationCollector
           }

      }
}

非常感谢任何帮助。

我不确定我是否完全理解你的问题,但我至少可以为第一个问题提供答案。

访问 onComplete 中的可变状态可能不安全。实际上,它使演员的全部目的无效。如果你访问可变状态,你应该总是通过参与者的邮箱来访问,例如self ! ModifyState(newValue).

这里的问题是 onComplete 方法与 actor 不在同一个线程中,这导致了两个线程可能同时修改相同数据的情况,因此改变状态具有与那里相同的所有问题通常处于状态的并发突变(竞争条件等)。

对于第二期,我只是通过self ! TypeAExtraValue(someValue)从不同类型的额外期货中收集价值,然后在收到所有额外价值后发送收集器(例如TypeAInfoCollector)。正确使用 actor 可以解决这个问题,因为它们一次只处理一条消息。

代码可能如下所示:

val extraInfoFuture = getExtraInfo()
val infoCollector = new Collector()

extraInfoFuture onComplete {
  case Success(extraInfo) => self ! ExtraInfo(extraInfo) 
}

...

def receive = {
  case ExtraInfo(info) => {
    infoCollector.collect(info)
    if(infoCollector.collectedAll) sender() ! infoCollector
  }
}