Akka Streams 在处理 n 个元素后停止流
Akka Streams stop stream after process n elements
我有 Akka Stream 流,它使用 alpakka 从文件中读取数据,处理数据并写入文件。我想在处理完 n 个元素后停止流程,计算持续时间并调用系统终止。我怎样才能实现它?
我的流程是这样的:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
sourceFile ~> mainFlow ~> sinkFile
ClosedShape
})
graph.run()
你有什么想法吗?谢谢
这里不需要GraphDSL。
val doneFuture = (sourceFile via mainFlow.take(N) runWith sinkFile) transformWith { _ => system.terminate() }
获取时间,可以使用akka-streams-contrib
:https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala
同意@Viktor的说法,首先你不需要使用graphDSL来实现这个,你可以使用take(n)
来完成图形。
其次,您可以使用 mapMaterializedValue
将回调附加到您的 Sink 物化值(反过来应该物化为 Future[Something]
)。
val graph: RunnableGraph[Future[FiniteDuration]] =
sourceFile
.via(mainFlow)
.take(N)
.toMat(sinkFile)(Keep.right)
.mapMaterializedValue { f ⇒
val start = System.nanoTime()
f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
}
graph.run().onComplete { duration ⇒
println(s"Elapsed time: $duration")
}
请注意,您将需要 ExecutionContext
范围。
编辑
即使您必须使用 graphDSL,同样的概念也适用。您只需要公开水槽的物化 Future
并在其上进行映射。
val graph: RunnableGraph[Future[??Something??]] =
RunnableGraph.fromGraph(GraphDSL.create(sinkFile) { implicit builder: GraphDSL.Builder[Future[Something]] => snk =>
import GraphDSL.Implicits._
sourceFile ~> mainFlow ~> snk
ClosedShape
})
val timedGraph: RunnableGraph[Future[FiniteDuration]] =
graph.mapMaterializedValue { f ⇒
val start = System.nanoTime()
f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
}
timedGraph.run().onComplete { duration ⇒
println(s"Elapsed time: $duration")
}
我有 Akka Stream 流,它使用 alpakka 从文件中读取数据,处理数据并写入文件。我想在处理完 n 个元素后停止流程,计算持续时间并调用系统终止。我怎样才能实现它?
我的流程是这样的:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
sourceFile ~> mainFlow ~> sinkFile
ClosedShape
})
graph.run()
你有什么想法吗?谢谢
这里不需要GraphDSL。
val doneFuture = (sourceFile via mainFlow.take(N) runWith sinkFile) transformWith { _ => system.terminate() }
获取时间,可以使用akka-streams-contrib
:https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala
同意@Viktor的说法,首先你不需要使用graphDSL来实现这个,你可以使用take(n)
来完成图形。
其次,您可以使用 mapMaterializedValue
将回调附加到您的 Sink 物化值(反过来应该物化为 Future[Something]
)。
val graph: RunnableGraph[Future[FiniteDuration]] =
sourceFile
.via(mainFlow)
.take(N)
.toMat(sinkFile)(Keep.right)
.mapMaterializedValue { f ⇒
val start = System.nanoTime()
f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
}
graph.run().onComplete { duration ⇒
println(s"Elapsed time: $duration")
}
请注意,您将需要 ExecutionContext
范围。
编辑
即使您必须使用 graphDSL,同样的概念也适用。您只需要公开水槽的物化 Future
并在其上进行映射。
val graph: RunnableGraph[Future[??Something??]] =
RunnableGraph.fromGraph(GraphDSL.create(sinkFile) { implicit builder: GraphDSL.Builder[Future[Something]] => snk =>
import GraphDSL.Implicits._
sourceFile ~> mainFlow ~> snk
ClosedShape
})
val timedGraph: RunnableGraph[Future[FiniteDuration]] =
graph.mapMaterializedValue { f ⇒
val start = System.nanoTime()
f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
}
timedGraph.run().onComplete { duration ⇒
println(s"Elapsed time: $duration")
}