Akka 流 GraphStage 不是 运行 异步回调的最终调用
Akka stream GraphStage is not running the final invocation of an async callback
我一直在兜圈子,断断续续,几周了,我还没有找到答案,所以我希望对 Akka Streams 有更好了解的人能够提供帮助。
我有一个自定义的 GraphStage,它执行大量逻辑然后调用异步进程(实际上它实现了一个子流以执行从外部服务到 S3 的二进制数据复制),它看起来像这样(简化):
class UploadItemShape(shapeNameAnyOf:Seq[String], bucketName:String, cannedAcl:CannedAcl)(implicit comm:VSCommunicator, mat:Materializer)
extends GraphStage[FlowShape[VSLazyItem, VSLazyItem ]] with FilenameHelpers {
private final val in:Inlet[VSLazyItem] = Inlet.create("UploadItemShape.in")
private final val out:Outlet[VSLazyItem] = Outlet.create("UploadItemShape.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private implicit val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)
private var canComplete:Boolean=true
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val completedCb = createAsyncCallback[VSLazyItem](i=>{
logger.info(s"called completedCb")
canComplete=true
push(out, i)
})
val failedCb = createAsyncCallback[Throwable](err=>{
logger.error("Called failedCallback: ", err)
canComplete=true
failStage(err)
})
val elem = grab(in)
canComplete = false
val shapes = shapeNameAnyOf.map(shapeName=>
findShape(elem,shapeName)
).collect({case Some(s)=>s})
if(shapes.nonEmpty){
if(shapes.length>1){
logger.warn(s"Got shapes multiple shapes $shapes for item ${elem.itemId}, using the first")
}
bunchOfFuturesChainedTogetherGivingUploadResult()
.flatMap(uploadResult=>{
logger.info(s"Uploaded to ${uploadResult.location}")
completedCb.invokeWithFeedback(elem)
}).recoverWith({
case err:Throwable=>
logger.error(s"Could not perform upload for any of shape $shapeNameAnyOf on item ${elem.itemId}: ", err)
failedCb.invokeWithFeedback(err)
})
} else {
val actualShapeNames = elem.shapes.map(_.keySet)
logger.error(s"No shapes could be found matching $shapeNameAnyOf on the given item (got $actualShapeNames)")
push(out, elem)
}
}
//override the finish function to ensure that any async procesing has completed before we allow ourselves
//to shut down
override def onUpstreamFinish(): Unit = {
var i=0
logger.info(s"Upstream finished")
while(!canComplete){
logger.info(s"Async processing ongoing, waiting for completion...")
i+=1
if(i>10) canComplete=true
Thread.sleep(1000)
}
logger.info(s"Processing completed")
completeStage()
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
logger.info("Downstream finished")
}
})
}
}
我运行它通过一个看起来像这样的测试用例(再次,简化):
val resultFut = Source
.single(item)
.via(testStage)
.log("streamComponents.UploadItemShape")
.toMat(Sink.seq)(Keep.right)
.run()
val result = Await.result(resultFut,30 seconds)
其中 testStage
是初始化的 GraphStage,其中适当的部分用 Mockito 模拟出来。
所以,当我最初 运行 它来自上面的单一来源时,我的模拟都被正确调用但最后没有发出任何东西(Sink.seq 给了我一个空序列)。
然后我添加了 canComplete
标志和 onUpstreamFinish
和 onDownstreamFinish
处理程序来显示发生了什么。
它一直等到 "Async processing ongoing" 消息超时,而我从未看到 "called completedCb" 消息。然后,当该阶段因超时而完成时,我看到 "Stage stopped before async invocation was processed"
然后我将测试更改为将两个项目推入舞台,如下所示:
val resultFut = Source
.fromIterator(()=>Seq(item,item).toIterator).async
.via(testStage)
.log("streamComponents.UploadItemShape")
.toMat(Sink.seq)(Keep.right)
.run()
val result = Await.result(resultFut,30 seconds)
日志显示异步回调是为第一项触发的,而不是最后一项:
12:34:58.152 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Determined /path/to/somefile.mp4 as the path to upload
12:34:58.153 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Filename with fixed extension is /path/to/somefile.mp4
12:34:58.192 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Uploaded to s3://somebucket/path/to/somefile
12:34:58.193 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - called completedCb
12:34:58.195 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Upstream finished
12:34:58.195 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:34:58.195 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Determined /path/to/somefile.mp4 as the path to upload
12:34:58.196 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Filename with fixed extension is /path/to/somefile.mp4
12:34:58.197 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Uploaded to s3://somebucket/path/to/somefile
12:34:59.199 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:00.203 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:01.206 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:02.210 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:03.213 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:04.218 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:05.221 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:06.226 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:07.230 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:08.234 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:09.237 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Processing completed
12:35:09.239 [scala-execution-context-global-95] ERROR streamComponents.UploadItemShape$$anon - Could not perform upload for any of shape List(lowres, lowaudio, lowimage) on item VX-1234:
akka.stream.StreamDetachedException: Stage with GraphStageLogic streamComponents.UploadItemShape$$anon@10371e38 stopped before async invocation was processed
(显然没有执行真正的上传,期货由 Mockito 作为 Future.complete 提供)。
似乎无论我做什么,一旦 upstream 完成(我从未看到 "Downstream finished" 消息)然后阶段就无法处理异步调用尽管尚未完成 and 知道它们仍然需要完成。我已经搜索了所有我能找到的文档和源代码,但还没有找到任何可以帮助我解决这个令人头疼的问题的东西。如有任何帮助,我们将不胜感激!
图阶段 运行 在一个 actor 内部并且一次只能做一件事,这意味着如果你让它在方法内部阻塞 (Thread.sleep
) 它永远不会同时做其他事情。
如果你想等待未完成的异步回调到达,你将不得不 "know" 通过以某种方式记录该事实(例如计数器)来确定有一个未完成的未来。当 onComplete 发生时,您必须通过 而不是 调用 completeStage()
让阶段继续 运行ning,然后确保在异步回调到达后完成阶段。
请注意,AsyncCallback
是在您创建后 运行ning 图形阶段的生命周期内保持常驻的东西,因此您不应该为每个推送的元素创建一个,而是创建一个并重新使用它,否则你手上会发生内存泄漏。
GraphStage
是一个低级别 API 来实现需要对 Akka Streams 有相当多的详细了解的运算符,如果您只想链接期货,我建议您使用其中一种现有的运营商处理 mapAsync
或 mapAsyncUnordered
等期货,而不是尝试自己制作,他们会按照您的意愿行事,直到所有未完成的期货都完成后才完成。
runWith上的akka-stream来源returns有成功有失败的未来。
val resultFut = Source
.fromIterator(()=>Seq(item,item).toIterator).async
.via(testStage)
.log("streamComponents.UploadItemShape")
.toMat(Sink.seq)
.runWith().onComplete {
case Success(value) => logger.info(s"stream completed successfully $value")
case Failure(e) => logger.error(s"stream completed with failure: $e")
}
onComplete 回调将在完成时调用。
根据我的说法,问题是你刚刚将一个函数映射到一个未来。映射实际上不会调用该函数,直到它被触发或分配给在其他地方使用的值。关键是,未来并不能通过将其映射为行为或恢复来具体化。
如果您将 flatMap 更改为 onComplete 并将相应的 completeCB 添加到成功并将 failedCB 添加到失败处理程序,这应该可以工作。
我已经实现了与您的代码中显示的非常相似的东西,它对我来说非常好。请检查修改后的代码。
class UploadItemShape(shapeNameAnyOf:Seq[String], bucketName:String, cannedAcl:CannedAcl)(implicit comm:VSCommunicator, mat:Materializer)
extends GraphStage[FlowShape[VSLazyItem, VSLazyItem ]] with FilenameHelpers {
private final val in:Inlet[VSLazyItem] = Inlet.create("UploadItemShape.in")
private final val out:Outlet[VSLazyItem] = Outlet.create("UploadItemShape.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private implicit val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)
private var canComplete:Boolean=true
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {}
val elem = grab(in)
canComplete = false
val shapes = shapeNameAnyOf.map(shapeName=>
findShape(elem,shapeName)
).collect({case Some(s)=>s})
if(shapes.nonEmpty){
if(shapes.length>1){
logger.warn(s"Got shapes multiple shapes $shapes for item ${elem.itemId}, using the first")
}
bunchOfFuturesChainedTogetherGivingUploadResult()
.onComplete{
case Success(_) =>
logger.info(s"called completedCb")
canComplete=true
push(out, i)
case Failure(ex) =>
logger.error(s"Could not perform upload for any of shape $shapeNameAnyOf on item ${elem.itemId}: ", err)
canComplete=true
failStage(err)
}
} else {
val actualShapeNames = elem.shapes.map(_.keySet)
logger.error(s"No shapes could be found matching $shapeNameAnyOf on the given item (got $actualShapeNames)")
push(out, elem)
}
}
//override the finish function to ensure that any async procesing has completed before we allow ourselves
//to shut down
override def onUpstreamFinish(): Unit = {
var i=0
logger.info(s"Upstream finished")
while(!canComplete){
logger.info(s"Async processing ongoing, waiting for completion...")
i+=1
if(i>10) canComplete=true
Thread.sleep(1000)
}
logger.info(s"Processing completed")
completeStage()
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
logger.info("Downstream finished")
}
})
}
}
我一直在兜圈子,断断续续,几周了,我还没有找到答案,所以我希望对 Akka Streams 有更好了解的人能够提供帮助。
我有一个自定义的 GraphStage,它执行大量逻辑然后调用异步进程(实际上它实现了一个子流以执行从外部服务到 S3 的二进制数据复制),它看起来像这样(简化):
class UploadItemShape(shapeNameAnyOf:Seq[String], bucketName:String, cannedAcl:CannedAcl)(implicit comm:VSCommunicator, mat:Materializer)
extends GraphStage[FlowShape[VSLazyItem, VSLazyItem ]] with FilenameHelpers {
private final val in:Inlet[VSLazyItem] = Inlet.create("UploadItemShape.in")
private final val out:Outlet[VSLazyItem] = Outlet.create("UploadItemShape.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private implicit val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)
private var canComplete:Boolean=true
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val completedCb = createAsyncCallback[VSLazyItem](i=>{
logger.info(s"called completedCb")
canComplete=true
push(out, i)
})
val failedCb = createAsyncCallback[Throwable](err=>{
logger.error("Called failedCallback: ", err)
canComplete=true
failStage(err)
})
val elem = grab(in)
canComplete = false
val shapes = shapeNameAnyOf.map(shapeName=>
findShape(elem,shapeName)
).collect({case Some(s)=>s})
if(shapes.nonEmpty){
if(shapes.length>1){
logger.warn(s"Got shapes multiple shapes $shapes for item ${elem.itemId}, using the first")
}
bunchOfFuturesChainedTogetherGivingUploadResult()
.flatMap(uploadResult=>{
logger.info(s"Uploaded to ${uploadResult.location}")
completedCb.invokeWithFeedback(elem)
}).recoverWith({
case err:Throwable=>
logger.error(s"Could not perform upload for any of shape $shapeNameAnyOf on item ${elem.itemId}: ", err)
failedCb.invokeWithFeedback(err)
})
} else {
val actualShapeNames = elem.shapes.map(_.keySet)
logger.error(s"No shapes could be found matching $shapeNameAnyOf on the given item (got $actualShapeNames)")
push(out, elem)
}
}
//override the finish function to ensure that any async procesing has completed before we allow ourselves
//to shut down
override def onUpstreamFinish(): Unit = {
var i=0
logger.info(s"Upstream finished")
while(!canComplete){
logger.info(s"Async processing ongoing, waiting for completion...")
i+=1
if(i>10) canComplete=true
Thread.sleep(1000)
}
logger.info(s"Processing completed")
completeStage()
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
logger.info("Downstream finished")
}
})
}
}
我运行它通过一个看起来像这样的测试用例(再次,简化):
val resultFut = Source
.single(item)
.via(testStage)
.log("streamComponents.UploadItemShape")
.toMat(Sink.seq)(Keep.right)
.run()
val result = Await.result(resultFut,30 seconds)
其中 testStage
是初始化的 GraphStage,其中适当的部分用 Mockito 模拟出来。
所以,当我最初 运行 它来自上面的单一来源时,我的模拟都被正确调用但最后没有发出任何东西(Sink.seq 给了我一个空序列)。
然后我添加了 canComplete
标志和 onUpstreamFinish
和 onDownstreamFinish
处理程序来显示发生了什么。
它一直等到 "Async processing ongoing" 消息超时,而我从未看到 "called completedCb" 消息。然后,当该阶段因超时而完成时,我看到 "Stage stopped before async invocation was processed"
然后我将测试更改为将两个项目推入舞台,如下所示:
val resultFut = Source
.fromIterator(()=>Seq(item,item).toIterator).async
.via(testStage)
.log("streamComponents.UploadItemShape")
.toMat(Sink.seq)(Keep.right)
.run()
val result = Await.result(resultFut,30 seconds)
日志显示异步回调是为第一项触发的,而不是最后一项:
12:34:58.152 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Determined /path/to/somefile.mp4 as the path to upload
12:34:58.153 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Filename with fixed extension is /path/to/somefile.mp4
12:34:58.192 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Uploaded to s3://somebucket/path/to/somefile
12:34:58.193 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - called completedCb
12:34:58.195 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Upstream finished
12:34:58.195 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:34:58.195 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Determined /path/to/somefile.mp4 as the path to upload
12:34:58.196 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Filename with fixed extension is /path/to/somefile.mp4
12:34:58.197 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Uploaded to s3://somebucket/path/to/somefile
12:34:59.199 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:00.203 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:01.206 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:02.210 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:03.213 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:04.218 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:05.221 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:06.226 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:07.230 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:08.234 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:09.237 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Processing completed
12:35:09.239 [scala-execution-context-global-95] ERROR streamComponents.UploadItemShape$$anon - Could not perform upload for any of shape List(lowres, lowaudio, lowimage) on item VX-1234:
akka.stream.StreamDetachedException: Stage with GraphStageLogic streamComponents.UploadItemShape$$anon@10371e38 stopped before async invocation was processed
(显然没有执行真正的上传,期货由 Mockito 作为 Future.complete 提供)。
似乎无论我做什么,一旦 upstream 完成(我从未看到 "Downstream finished" 消息)然后阶段就无法处理异步调用尽管尚未完成 and 知道它们仍然需要完成。我已经搜索了所有我能找到的文档和源代码,但还没有找到任何可以帮助我解决这个令人头疼的问题的东西。如有任何帮助,我们将不胜感激!
图阶段 运行 在一个 actor 内部并且一次只能做一件事,这意味着如果你让它在方法内部阻塞 (Thread.sleep
) 它永远不会同时做其他事情。
如果你想等待未完成的异步回调到达,你将不得不 "know" 通过以某种方式记录该事实(例如计数器)来确定有一个未完成的未来。当 onComplete 发生时,您必须通过 而不是 调用 completeStage()
让阶段继续 运行ning,然后确保在异步回调到达后完成阶段。
请注意,AsyncCallback
是在您创建后 运行ning 图形阶段的生命周期内保持常驻的东西,因此您不应该为每个推送的元素创建一个,而是创建一个并重新使用它,否则你手上会发生内存泄漏。
GraphStage
是一个低级别 API 来实现需要对 Akka Streams 有相当多的详细了解的运算符,如果您只想链接期货,我建议您使用其中一种现有的运营商处理 mapAsync
或 mapAsyncUnordered
等期货,而不是尝试自己制作,他们会按照您的意愿行事,直到所有未完成的期货都完成后才完成。
runWith上的akka-stream来源returns有成功有失败的未来。
val resultFut = Source
.fromIterator(()=>Seq(item,item).toIterator).async
.via(testStage)
.log("streamComponents.UploadItemShape")
.toMat(Sink.seq)
.runWith().onComplete {
case Success(value) => logger.info(s"stream completed successfully $value")
case Failure(e) => logger.error(s"stream completed with failure: $e")
}
onComplete 回调将在完成时调用。
根据我的说法,问题是你刚刚将一个函数映射到一个未来。映射实际上不会调用该函数,直到它被触发或分配给在其他地方使用的值。关键是,未来并不能通过将其映射为行为或恢复来具体化。 如果您将 flatMap 更改为 onComplete 并将相应的 completeCB 添加到成功并将 failedCB 添加到失败处理程序,这应该可以工作。 我已经实现了与您的代码中显示的非常相似的东西,它对我来说非常好。请检查修改后的代码。
class UploadItemShape(shapeNameAnyOf:Seq[String], bucketName:String, cannedAcl:CannedAcl)(implicit comm:VSCommunicator, mat:Materializer)
extends GraphStage[FlowShape[VSLazyItem, VSLazyItem ]] with FilenameHelpers {
private final val in:Inlet[VSLazyItem] = Inlet.create("UploadItemShape.in")
private final val out:Outlet[VSLazyItem] = Outlet.create("UploadItemShape.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private implicit val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)
private var canComplete:Boolean=true
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {}
val elem = grab(in)
canComplete = false
val shapes = shapeNameAnyOf.map(shapeName=>
findShape(elem,shapeName)
).collect({case Some(s)=>s})
if(shapes.nonEmpty){
if(shapes.length>1){
logger.warn(s"Got shapes multiple shapes $shapes for item ${elem.itemId}, using the first")
}
bunchOfFuturesChainedTogetherGivingUploadResult()
.onComplete{
case Success(_) =>
logger.info(s"called completedCb")
canComplete=true
push(out, i)
case Failure(ex) =>
logger.error(s"Could not perform upload for any of shape $shapeNameAnyOf on item ${elem.itemId}: ", err)
canComplete=true
failStage(err)
}
} else {
val actualShapeNames = elem.shapes.map(_.keySet)
logger.error(s"No shapes could be found matching $shapeNameAnyOf on the given item (got $actualShapeNames)")
push(out, elem)
}
}
//override the finish function to ensure that any async procesing has completed before we allow ourselves
//to shut down
override def onUpstreamFinish(): Unit = {
var i=0
logger.info(s"Upstream finished")
while(!canComplete){
logger.info(s"Async processing ongoing, waiting for completion...")
i+=1
if(i>10) canComplete=true
Thread.sleep(1000)
}
logger.info(s"Processing completed")
completeStage()
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
logger.info("Downstream finished")
}
})
}
}