在 Akka Stream 中基于相同 ID 合并两个 Stream

Merge two Stream based on Same ID in Akka Stream

我有两个输入流。我想合并两个基于相同 ID 的流元素。这是代码详情

  implicit val system = ActorSystem("sourceDemo")
  implicit val materializer = ActorMaterializer()

  case class Foo(id: Int, value: String)
  case class Bar(id: Int, value: String)
  case class MergeResult(id: Int, fooValue: String, barValue: String)

  val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
  val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))

我想得到的结果是MergeResult,它基于FooBar中相同的id。

此外,对于某些 ID 不匹配的 FooBar,我想保留在内存中,我想知道是否有一种干净的方法来做到这一点,因为它是有状态的。

更重要的是,源元素是有序的。如果发现 ID 重复,则应采用先匹配先服务的策略。这意味着如果 Foo(1, "foo-1"), Foo(1, "foo-2")Bar(1, "Bar-1"),匹配应该是 MergeResult(1, "foo-1", "Bar-1") .

我目前正在查看来自 akka 流的一些解决方案。如果有一些好的解决方案,如 Spark、Flink 等,那也会有帮助。

提前致谢。

您描述的正是连接操作。

Akka 流不支持连接操作。您可能会找到一种方法,使用每个流上的窗口和一些 actor/stateful 转换在它们之间进行查找,但上次我搜索这个时我什么也没找到(不久前),所以您可能在未知水域。

您只会在更重量级的框架上找到流连接:Flink、Spark Streaming、Kafka 流。原因是 join 从根本上说是一个流对另一个流的查找,这意味着它需要比 Akka 流的设计者想要处理的更复杂的东西(状态管理)。