Akka 流 GraphStage 不是 运行 异步回调的最终调用

Akka stream GraphStage is not running the final invocation of an async callback

我一直在兜圈子,断断续续,几周了,我还没有找到答案,所以我希望对 Akka Streams 有更好了解的人能够提供帮助。

我有一个自定义的 GraphStage,它执行大量逻辑然后调用异步进程(实际上它实现了一个子流以执行从外部服务到 S3 的二进制数据复制),它看起来像这样(简化):

class UploadItemShape(shapeNameAnyOf:Seq[String], bucketName:String, cannedAcl:CannedAcl)(implicit comm:VSCommunicator, mat:Materializer)
  extends GraphStage[FlowShape[VSLazyItem, VSLazyItem ]] with FilenameHelpers {

  private final val in:Inlet[VSLazyItem] = Inlet.create("UploadItemShape.in")
  private final val out:Outlet[VSLazyItem] = Outlet.create("UploadItemShape.out")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private implicit val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)

    private var canComplete:Boolean=true

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val completedCb = createAsyncCallback[VSLazyItem](i=>{
          logger.info(s"called completedCb")
          canComplete=true
          push(out, i)
        })

        val failedCb = createAsyncCallback[Throwable](err=>{
          logger.error("Called failedCallback: ", err)
          canComplete=true
          failStage(err)
        })

        val elem = grab(in)
        canComplete = false

        val shapes = shapeNameAnyOf.map(shapeName=>
          findShape(elem,shapeName)
        ).collect({case Some(s)=>s})

        if(shapes.nonEmpty){
          if(shapes.length>1){
            logger.warn(s"Got shapes multiple shapes $shapes for item ${elem.itemId}, using the first")
          }

          bunchOfFuturesChainedTogetherGivingUploadResult()
          .flatMap(uploadResult=>{
            logger.info(s"Uploaded to ${uploadResult.location}")
            completedCb.invokeWithFeedback(elem)
          }).recoverWith({
            case err:Throwable=>
              logger.error(s"Could not perform upload for any of shape $shapeNameAnyOf on item ${elem.itemId}: ", err)
              failedCb.invokeWithFeedback(err)
          })

        } else {
          val actualShapeNames = elem.shapes.map(_.keySet)
          logger.error(s"No shapes could be found matching $shapeNameAnyOf on the given item (got $actualShapeNames)")
          push(out, elem)
        }
      }

      //override the finish function to ensure that any async procesing has completed before we allow ourselves
      //to shut down
      override def onUpstreamFinish(): Unit = {
        var i=0

        logger.info(s"Upstream finished")
        while(!canComplete){
          logger.info(s"Async processing ongoing, waiting for completion...")
          i+=1
          if(i>10) canComplete=true
          Thread.sleep(1000)
        }
        logger.info(s"Processing completed")
        completeStage()
      }
    })

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = pull(in)

      override def onDownstreamFinish(): Unit = {
        logger.info("Downstream finished")
      }
    })
  }
}

我运行它通过一个看起来像这样的测试用例(再次,简化):

      val resultFut = Source
        .single(item)
        .via(testStage)
        .log("streamComponents.UploadItemShape")
        .toMat(Sink.seq)(Keep.right)
        .run()

      val result = Await.result(resultFut,30 seconds)

其中 testStage 是初始化的 GraphStage,其中适当的部分用 Mockito 模拟出来。

所以,当我最初 运行 它来自上面的单一来源时,我的模拟都被正确调用但最后没有发出任何东西(Sink.seq 给了我一个空序列)。 然后我添加了 canComplete 标志和 onUpstreamFinishonDownstreamFinish 处理程序来显示发生了什么。

它一直等到 "Async processing ongoing" 消息超时,而我从未看到 "called completedCb" 消息。然后,当该阶段因超时而完成时,我看到 "Stage stopped before async invocation was processed"

然后我将测试更改为将两个项目推入舞台,如下所示:

      val resultFut = Source
        .fromIterator(()=>Seq(item,item).toIterator).async
        .via(testStage)
        .log("streamComponents.UploadItemShape")
        .toMat(Sink.seq)(Keep.right)
        .run()

      val result = Await.result(resultFut,30 seconds)

日志显示异步回调为第一项触发的,而不是最后一项:

12:34:58.152 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Determined /path/to/somefile.mp4 as the path to upload
12:34:58.153 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Filename with fixed extension is /path/to/somefile.mp4
12:34:58.192 [scala-execution-context-global-57] INFO streamComponents.UploadItemShape$$anon - Uploaded to s3://somebucket/path/to/somefile
12:34:58.193 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - called completedCb
12:34:58.195 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Upstream finished
12:34:58.195 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:34:58.195 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Determined /path/to/somefile.mp4 as the path to upload
12:34:58.196 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Filename with fixed extension is /path/to/somefile.mp4
12:34:58.197 [scala-execution-context-global-95] INFO streamComponents.UploadItemShape$$anon - Uploaded to s3://somebucket/path/to/somefile
12:34:59.199 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:00.203 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:01.206 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:02.210 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:03.213 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:04.218 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:05.221 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:06.226 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:07.230 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:08.234 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Async processing ongoing, waiting for completion...
12:35:09.237 [default-akka.actor.default-dispatcher-6] INFO streamComponents.UploadItemShape$$anon - Processing completed
12:35:09.239 [scala-execution-context-global-95] ERROR streamComponents.UploadItemShape$$anon - Could not perform upload for any of shape List(lowres, lowaudio, lowimage) on item VX-1234: 
akka.stream.StreamDetachedException: Stage with GraphStageLogic streamComponents.UploadItemShape$$anon@10371e38 stopped before async invocation was processed

(显然没有执行真正的上传,期货由 Mockito 作为 Future.complete 提供)。

似乎无论我做什么,一旦 upstream 完成(我从未看到 "Downstream finished" 消息)然后阶段就无法处理异步调用尽管尚未完成 and 知道它们仍然需要完成。我已经搜索了所有我能找到的文档和源代码,但还没有找到任何可以帮助我解决这个令人头疼的问题的东西。如有任何帮助,我们将不胜感激!

图阶段 运行 在一个 actor 内部并且一次只能做一件事,这意味着如果你让它在方法内部阻塞 (Thread.sleep) 它永远不会同时做其他事情。

如果你想等待未完成的异步回调到达,你将不得不 "know" 通过以某种方式记录该事实(例如计数器)来确定有一个未完成的未来。当 onComplete 发生时,您必须通过 而不是 调用 completeStage() 让阶段继续 运行ning,然后确保在异步回调到达后完成阶段。

请注意,AsyncCallback 是在您创建后 运行ning 图形阶段的生命周期内保持常驻的东西,因此您不应该为每个推送的元素创建一个,而是创建一个并重新使用它,否则你手上会发生内存泄漏。

GraphStage 是一个低级别 API 来实现需要对 Akka Streams 有相当多的详细了解的运算符,如果您只想链接期货,我建议您使用其中一种现有的运营商处理 mapAsyncmapAsyncUnordered 等期货,而不是尝试自己制作,他们会按照您的意愿行事,直到所有未完成的期货都完成后才完成。

runWith上的akka​​-stream来源returns有成功有失败的未来。

 val resultFut = Source
        .fromIterator(()=>Seq(item,item).toIterator).async
        .via(testStage)
        .log("streamComponents.UploadItemShape")
        .toMat(Sink.seq)
        .runWith().onComplete {
            case Success(value) => logger.info(s"stream completed successfully $value")
            case Failure(e) => logger.error(s"stream completed with failure: $e")
         }

onComplete 回调将在完成时调用。

根据我的说法,问题是你刚刚将一个函数映射到一个未来。映射实际上不会调用该函数,直到它被触发或分配给在其他地方使用的值。关键是,未来并不能通过将其映射为行为或恢复来具体化。 如果您将 flatMap 更改为 onComplete 并将相应的 completeCB 添加到成功并将 failedCB 添加到失败处理程序,这应该可以工作。 我已经实现了与您的代码中显示的非常相似的东西,它对我来说非常好。请检查修改后的代码。

class UploadItemShape(shapeNameAnyOf:Seq[String], bucketName:String, cannedAcl:CannedAcl)(implicit comm:VSCommunicator, mat:Materializer)
  extends GraphStage[FlowShape[VSLazyItem, VSLazyItem ]] with FilenameHelpers {

  private final val in:Inlet[VSLazyItem] = Inlet.create("UploadItemShape.in")
  private final val out:Outlet[VSLazyItem] = Outlet.create("UploadItemShape.out")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private implicit val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)

    private var canComplete:Boolean=true

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {}

        val elem = grab(in)
        canComplete = false

        val shapes = shapeNameAnyOf.map(shapeName=>
          findShape(elem,shapeName)
        ).collect({case Some(s)=>s})

        if(shapes.nonEmpty){
          if(shapes.length>1){
            logger.warn(s"Got shapes multiple shapes $shapes for item ${elem.itemId}, using the first")
          }

          bunchOfFuturesChainedTogetherGivingUploadResult()
          .onComplete{
            case Success(_) =>
              logger.info(s"called completedCb")
              canComplete=true
              push(out, i)
            case Failure(ex) =>
              logger.error(s"Could not perform upload for any of shape $shapeNameAnyOf on item ${elem.itemId}: ", err)
              canComplete=true
              failStage(err)

          }

        } else {
          val actualShapeNames = elem.shapes.map(_.keySet)
          logger.error(s"No shapes could be found matching $shapeNameAnyOf on the given item (got $actualShapeNames)")
          push(out, elem)
        }
      }

      //override the finish function to ensure that any async procesing has completed before we allow ourselves
      //to shut down
      override def onUpstreamFinish(): Unit = {
        var i=0

        logger.info(s"Upstream finished")
        while(!canComplete){
          logger.info(s"Async processing ongoing, waiting for completion...")
          i+=1
          if(i>10) canComplete=true
          Thread.sleep(1000)
        }
        logger.info(s"Processing completed")
        completeStage()
      }
    })

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = pull(in)

      override def onDownstreamFinish(): Unit = {
        logger.info("Downstream finished")
      }
    })
  }
}