Akka Streams 从图中先前源的结果创建新源

Akka Streams create new Source from result of previous Source in a graph

我正在尝试创建采用 1 个输入源、采用最后一个元素并生成新源的图形。目前我只能通过 Sink.last.

实现第一个流来做到这一点

Source.fromFuture(curStream.runWith(Sink.last).map(last => createStreamFromOffset(last)))

找到解决方案:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val in = Source(1 to 10)

    val out = Sink.foreach(println)

    val concat = builder.add(Concat[Int](2))

    val f1 = Flow[Int].fold(0) { (acc, el) =>
      el
    }

    in ~> f1 ~> concat

    // creating new source from old stream last value
    in.via(f1).flatMapConcat(_ => Source(666 to 670)) ~> concat

    concat ~> out

    ClosedShape
  })
  g.run()