处理完所有元素后如何向 Sink 发出信号?
How to signal the Sink when all elements have been processed?
我有一个接受一系列文件的图表,一个一个地处理它们,然后在执行结束时,程序应该 return 成功(0)或失败(-1)如果全部执行成功或失败。
这最后一步如何实现? Sink 如何知道它何时收到最后一个文件的结果?
val graph = createGraph("path-to-list-of-files")
val result = graph.run()
def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = {
printStage("PREPARING") {
val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource()
val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow()
val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow()
val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow()
val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow()
val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink()
val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter
ClosedShape
})
printLine("The graph pipeline was created")
graphResult
}
您的 reporter
接收器已经具体化为 Future[Done]
,如果您想在处理完所有元素后 运行 一些代码,您可以挂钩它。
但是,目前您没有在图表中公开它。虽然有一种方法可以使用图形 DSL 来公开它,但在您的情况下,使用流畅的 DSL 来实现这一点甚至更容易:
val graphResult: RunnableGraph[Future[Done]] = producer
.via(validator)
.via(provisioner)
.via(executor)
.via(evaluator)
.toMat(reporter)(Keep.right)
当您 运行 您的图表
时,这将返回 Future[Done]
val result: Future[Done] = graph.run()
然后你可以挂钩 - 例如
result.onComplete {
case Success(_) => println("Success!")
case Failure(_) => println("Failure..")
}
我有一个接受一系列文件的图表,一个一个地处理它们,然后在执行结束时,程序应该 return 成功(0)或失败(-1)如果全部执行成功或失败。
这最后一步如何实现? Sink 如何知道它何时收到最后一个文件的结果?
val graph = createGraph("path-to-list-of-files")
val result = graph.run()
def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = {
printStage("PREPARING") {
val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource()
val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow()
val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow()
val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow()
val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow()
val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink()
val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter
ClosedShape
})
printLine("The graph pipeline was created")
graphResult
}
您的 reporter
接收器已经具体化为 Future[Done]
,如果您想在处理完所有元素后 运行 一些代码,您可以挂钩它。
但是,目前您没有在图表中公开它。虽然有一种方法可以使用图形 DSL 来公开它,但在您的情况下,使用流畅的 DSL 来实现这一点甚至更容易:
val graphResult: RunnableGraph[Future[Done]] = producer
.via(validator)
.via(provisioner)
.via(executor)
.via(evaluator)
.toMat(reporter)(Keep.right)
当您 运行 您的图表
时,这将返回Future[Done]
val result: Future[Done] = graph.run()
然后你可以挂钩 - 例如
result.onComplete {
case Success(_) => println("Success!")
case Failure(_) => println("Failure..")
}