Source.Single() 过早终止具有 2 个源的流

Source.Single() terminates stream with 2 sources prematurely

我定义了一个简单的图形,它结合了恒定流(通过 Source.Single(5000) 定义)和非常量源(例如 Source(1 to 100))。我的图表通过 ZipLatestWith 运算符将两个数字加在一起。目的是让流的输出为 5001, 5002, 5003, 5004, ..., 5100。实际上,程序的输出只是 5000,然后流终止可能是因为 Single 源已完成。

如何获得预期的结果,即具有 5000 常量值的源与非常量源的每个值相结合?请注意,由于概念原因(与此特定示例无关),恒定源仍然是流的源很重要。一个完整的代码示例如下,它只是将数字 5000 打印到控制台。

Main.scala:

import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}

object Main {
  def apply(): Behavior[NotUsed] = {
    Behaviors.setup { context =>

      val constantStream: Source[Int, NotUsed] = Source.single(5000)
      val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
        implicit builder =>
          import GraphDSL.Implicits._

          val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
          constantStream ~> zipper.in1

          FlowShape(zipper.in0, zipper.out)
      })

      Source(1 to 100)
        .via(graph)
        .to(Sink.foreach(println)).run()(Materializer.apply(context))

      Behaviors.receiveSignal {
        case (_, Terminated(_)) =>
          Behaviors.stopped
      }
    }
  }

  def main(args: Array[String]): Unit = {
    ActorSystem(Main(), "test")
  }
}

build.sbt:

scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"

只需使用 Source.repeat(5000) 即可。

除了@cbley 所说的,你应该尽可能避免GraphDSL。最好使用现有的组合器,因为它不易出错且更容易理解。

上图可以用

表示
Source(1 to 100).zip(Source.repeat(5000))
  .map { case (x, y) => x + y }
  .to(Sink.foreach(println))