处理完所有元素后如何向 Sink 发出信号?

How to signal the Sink when all elements have been processed?

我有一个接受一系列文件的图表,一个一个地处理它们,然后在执行结束时,程序应该 return 成功(0)或失败(-1)如果全部执行成功或失败。
这最后一步如何实现? Sink 如何知道它何时收到最后一个文件的结果?

val graph = createGraph("path-to-list-of-files")
val result = graph.run()

def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = {
  printStage("PREPARING") {
  val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource()
  val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow()
  val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow()
  val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow()
  val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow()
  val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink()

  val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._
    producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter

    ClosedShape
  })
  printLine("The graph pipeline was created")
  graphResult
}

您的 reporter 接收器已经具体化为 Future[Done],如果您想在处理完所有元素后 运行 一些代码,您可以挂钩它。

但是,目前您没有在图表中公开它。虽然有一种方法可以使用图形 DSL 来公开它,但在您的情况下,使用流畅的 DSL 来实现这一点甚至更容易:

  val graphResult: RunnableGraph[Future[Done]] = producer
    .via(validator)
    .via(provisioner)
    .via(executor)
    .via(evaluator)
    .toMat(reporter)(Keep.right)

当您 运行 您的图表

时,这将返回 Future[Done]
val result: Future[Done] = graph.run()

然后你可以挂钩 - 例如

result.onComplete {
  case Success(_) => println("Success!")
  case Failure(_) => println("Failure..")
}