在 Akka Stream 中基于相同 ID 合并两个 Stream
Merge two Stream based on Same ID in Akka Stream
我有两个输入流。我想合并两个基于相同 ID 的流元素。这是代码详情
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
case class Foo(id: Int, value: String)
case class Bar(id: Int, value: String)
case class MergeResult(id: Int, fooValue: String, barValue: String)
val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))
我想得到的结果是MergeResult
,它基于Foo
和Bar
中相同的id。
此外,对于某些 ID 不匹配的 Foo
和 Bar
,我想保留在内存中,我想知道是否有一种干净的方法来做到这一点,因为它是有状态的。
更重要的是,源元素是有序的。如果发现 ID 重复,则应采用先匹配先服务的策略。这意味着如果 Foo(1, "foo-1"), Foo(1, "foo-2")
和 Bar(1, "Bar-1")
,匹配应该是 MergeResult(1, "foo-1", "Bar-1")
.
我目前正在查看来自 akka 流的一些解决方案。如果有一些好的解决方案,如 Spark、Flink 等,那也会有帮助。
提前致谢。
您描述的正是连接操作。
Akka 流不支持连接操作。您可能会找到一种方法,使用每个流上的窗口和一些 actor/stateful 转换在它们之间进行查找,但上次我搜索这个时我什么也没找到(不久前),所以您可能在未知水域。
您只会在更重量级的框架上找到流连接:Flink、Spark Streaming、Kafka 流。原因是 join 从根本上说是一个流对另一个流的查找,这意味着它需要比 Akka 流的设计者想要处理的更复杂的东西(状态管理)。
我有两个输入流。我想合并两个基于相同 ID 的流元素。这是代码详情
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
case class Foo(id: Int, value: String)
case class Bar(id: Int, value: String)
case class MergeResult(id: Int, fooValue: String, barValue: String)
val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))
我想得到的结果是MergeResult
,它基于Foo
和Bar
中相同的id。
此外,对于某些 ID 不匹配的 Foo
和 Bar
,我想保留在内存中,我想知道是否有一种干净的方法来做到这一点,因为它是有状态的。
更重要的是,源元素是有序的。如果发现 ID 重复,则应采用先匹配先服务的策略。这意味着如果 Foo(1, "foo-1"), Foo(1, "foo-2")
和 Bar(1, "Bar-1")
,匹配应该是 MergeResult(1, "foo-1", "Bar-1")
.
我目前正在查看来自 akka 流的一些解决方案。如果有一些好的解决方案,如 Spark、Flink 等,那也会有帮助。
提前致谢。
您描述的正是连接操作。
Akka 流不支持连接操作。您可能会找到一种方法,使用每个流上的窗口和一些 actor/stateful 转换在它们之间进行查找,但上次我搜索这个时我什么也没找到(不久前),所以您可能在未知水域。
您只会在更重量级的框架上找到流连接:Flink、Spark Streaming、Kafka 流。原因是 join 从根本上说是一个流对另一个流的查找,这意味着它需要比 Akka 流的设计者想要处理的更复杂的东西(状态管理)。