Akka Streams 在处理 n 个元素后停止流

Akka Streams stop stream after process n elements

我有 Akka Stream 流,它使用 alpakka 从文件中读取数据,处理数据并写入文件。我想在处理完 n 个元素后停止流程,计算持续时间并调用系统终止。我怎样才能实现它?

我的流程是这样的:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._

 sourceFile ~> mainFlow ~> sinkFile

ClosedShape
})

graph.run()

你有什么想法吗?谢谢

这里不需要GraphDSL。

val doneFuture = (sourceFile via mainFlow.take(N) runWith sinkFile) transformWith { _ => system.terminate() }

获取时间,可以使用akka-streams-contribhttps://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala

同意@Viktor的说法,首先你不需要使用graphDSL来实现这个,你可以使用take(n)来完成图形。

其次,您可以使用 mapMaterializedValue 将回调附加到您的 Sink 物化值(反过来应该物化为 Future[Something])。

  val graph: RunnableGraph[Future[FiniteDuration]] =
    sourceFile
      .via(mainFlow)
      .take(N)
      .toMat(sinkFile)(Keep.right)
      .mapMaterializedValue { f ⇒
        val start = System.nanoTime()
        f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
      }

  graph.run().onComplete { duration ⇒
    println(s"Elapsed time: $duration")
  }

请注意,您将需要 ExecutionContext 范围。

编辑

即使您必须使用 graphDSL,同样的概念也适用。您只需要公开水槽的物化 Future 并在其上进行映射。

  val graph: RunnableGraph[Future[??Something??]] = 
    RunnableGraph.fromGraph(GraphDSL.create(sinkFile) { implicit builder: GraphDSL.Builder[Future[Something]] => snk =>
    import GraphDSL.Implicits._

    sourceFile ~> mainFlow ~> snk

    ClosedShape
  })

  val timedGraph: RunnableGraph[Future[FiniteDuration]] = 
    graph.mapMaterializedValue { f ⇒
      val start = System.nanoTime()
      f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
    }

  timedGraph.run().onComplete { duration ⇒
    println(s"Elapsed time: $duration")
  }