Akka-stream:在 mapAsync 完成处理数据之前调用 onComplete

Akka-stream: onComplete called before mapAsync has finished to process data

我目前正在处理一个使用 AKKA stream 1.0 处理数据的简单批处理。 如果我避免在流程步骤中使用 mapAsync 方法,一切都会 运行 顺利。

调用 oncomplete 时,最终确定结果文件并通过 Reaper actor 关闭代理系统(参见 Reaper 模式):

val file = new File(inputFile)
val run: Future[Int] = source(file)
 .via(parse)
 .via(enrich)
 .via(writeEnriched)
 .runWith(printProgress)
run.onComplete { result: Try[Int] =>
 context.system.log.info(s"Nb elements processed: ${result.get}")
 writerActorRef ! FinalizeResults()
}

我想加速的步骤之一是丰富数据的部分。有时,数据无法丰富,下一步应忽略。

def enrich(implicit ec: ExecutionContext) : Flow[Data, EnrichedData, Unit]
= Flow[Data].map(enriched.enrich(_)).collect {
 case Some(enrichedData) => enrichedData
}

所有这些代码 运行 很好,调用 onComplete() 时我没有丢失任何元素。

Input: 45639
Nb elements processed: 45639

当我尝试使用 mapAync 和 Future 而不是 map 来加速进行丰富步骤时,在处理完所有元素之前调用了 onComplete。

def enrich(implicit ec: ExecutionContext) : Flow[Data, EnrichedData, Unit]
   = Flow[Data].mapAsyncUnordered(8)(data => Future(enricher.enrich(data))).collect {
     case Some(enrichedData) => enrichedData
   }

我在最后遗漏了一些元素,而且从来没有相同的数字 所有这些代码都很好 运行 并且在调用 onComplete() 时我没有丢失任何元素。

Input: 45639
Nb elements processed: 45628

我无法找到一种方法来发现所有内容都已处理... 知道我做错了什么吗?

我终于找到问题了。我的问题不在 akka 流上,而是在一个非线程安全的限制检查器上……感谢您的帮助 对于那些感兴趣的人,我已经在 github 上上传了一些代码:https://github.com/PixelDuck/akka-stream-test