Source.Single() 过早终止具有 2 个源的流
Source.Single() terminates stream with 2 sources prematurely
我定义了一个简单的图形,它结合了恒定流(通过 Source.Single(5000)
定义)和非常量源(例如 Source(1 to 100)
)。我的图表通过 ZipLatestWith
运算符将两个数字加在一起。目的是让流的输出为 5001, 5002, 5003, 5004, ..., 5100
。实际上,程序的输出只是 5000
,然后流终止可能是因为 Single
源已完成。
如何获得预期的结果,即具有 5000
常量值的源与非常量源的每个值相结合?请注意,由于概念原因(与此特定示例无关),恒定源仍然是流的源很重要。一个完整的代码示例如下,它只是将数字 5000
打印到控制台。
Main.scala:
import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}
object Main {
def apply(): Behavior[NotUsed] = {
Behaviors.setup { context =>
val constantStream: Source[Int, NotUsed] = Source.single(5000)
val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
implicit builder =>
import GraphDSL.Implicits._
val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
constantStream ~> zipper.in1
FlowShape(zipper.in0, zipper.out)
})
Source(1 to 100)
.via(graph)
.to(Sink.foreach(println)).run()(Materializer.apply(context))
Behaviors.receiveSignal {
case (_, Terminated(_)) =>
Behaviors.stopped
}
}
}
def main(args: Array[String]): Unit = {
ActorSystem(Main(), "test")
}
}
build.sbt:
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
只需使用 Source.repeat(5000)
即可。
除了@cbley 所说的,你应该尽可能避免GraphDSL
。最好使用现有的组合器,因为它不易出错且更容易理解。
上图可以用
表示
Source(1 to 100).zip(Source.repeat(5000))
.map { case (x, y) => x + y }
.to(Sink.foreach(println))
我定义了一个简单的图形,它结合了恒定流(通过 Source.Single(5000)
定义)和非常量源(例如 Source(1 to 100)
)。我的图表通过 ZipLatestWith
运算符将两个数字加在一起。目的是让流的输出为 5001, 5002, 5003, 5004, ..., 5100
。实际上,程序的输出只是 5000
,然后流终止可能是因为 Single
源已完成。
如何获得预期的结果,即具有 5000
常量值的源与非常量源的每个值相结合?请注意,由于概念原因(与此特定示例无关),恒定源仍然是流的源很重要。一个完整的代码示例如下,它只是将数字 5000
打印到控制台。
Main.scala:
import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}
object Main {
def apply(): Behavior[NotUsed] = {
Behaviors.setup { context =>
val constantStream: Source[Int, NotUsed] = Source.single(5000)
val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
implicit builder =>
import GraphDSL.Implicits._
val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
constantStream ~> zipper.in1
FlowShape(zipper.in0, zipper.out)
})
Source(1 to 100)
.via(graph)
.to(Sink.foreach(println)).run()(Materializer.apply(context))
Behaviors.receiveSignal {
case (_, Terminated(_)) =>
Behaviors.stopped
}
}
}
def main(args: Array[String]): Unit = {
ActorSystem(Main(), "test")
}
}
build.sbt:
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
只需使用 Source.repeat(5000)
即可。
除了@cbley 所说的,你应该尽可能避免GraphDSL
。最好使用现有的组合器,因为它不易出错且更容易理解。
上图可以用
表示Source(1 to 100).zip(Source.repeat(5000))
.map { case (x, y) => x + y }
.to(Sink.foreach(println))