Flink Streaming:由控制流控制的数据流

Flink Streaming: Data stream that gets controlled by control stream

我有一个问题是这个问题的变体:

我有两个流:

  1. val ipStream: DataStream[IPAddress] = ???
  2. val routeStream: DataStream[RoutingTable] = ???

我想知道哪个包使用了哪条路由。通常这可以通过以下方式完成:

val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"

这里的问题是我无法在此处真正为流设置密钥,因为这需要完整的 table 以及 ip 地址(并​​且密钥必须单独计算)。

对于 ipStream 中的每个元素,我需要最新的 routeStream 元素。现在我正在使用一个 hack,所有这些都是非并行处理的:

ipStream
  .connect(routeStream)
  .keyBy(_ => 0, _ => 0)
  .flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]

这听起来像是广播策略的用例。但是,routeStream 会更新,不会固定在一个文件中。问题仍然存在:有没有办法拥有两个流,其中一个包含另一个流的更改控制数据?

既然我解决了这个问题,不妨在这里写一个答案:)

我这样输入了两个流:

  1. RoutingTable 流是使用网络路由的第一个字节作为键控的
  2. IP 地址也由地址的第一个字节键入

这在 IP 数据包通常使用相同的 /8 前缀在网络中路由的情况下有效,对于大多数流量都可以假设。

然后,通过有状态 RichCoFlatMap 可以建立路由 table 状态作为键。当收到一个新的 IP 包时,在路由 table 中进行查找。现在有两种可能的情况:

  1. 没有找到匹配的路线。我们可以将包存储在此处以备后用,但也可以丢弃它。
  2. 如果找到路由,则输出[IPAddress, RoutingTableEntry]的元组。

这样,我们就有了两个流,其中一个流具有更改另一个流的控制数据。