Flink Streaming:由控制流控制的数据流
Flink Streaming: Data stream that gets controlled by control stream
我有一个问题是这个问题的变体:
我有两个流:
val ipStream: DataStream[IPAddress] = ???
val routeStream: DataStream[RoutingTable] = ???
我想知道哪个包使用了哪条路由。通常这可以通过以下方式完成:
val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"
这里的问题是我无法在此处真正为流设置密钥,因为这需要完整的 table 以及 ip 地址(并且密钥必须单独计算)。
对于 ipStream
中的每个元素,我需要最新的 routeStream
元素。现在我正在使用一个 hack,所有这些都是非并行处理的:
ipStream
.connect(routeStream)
.keyBy(_ => 0, _ => 0)
.flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]
这听起来像是广播策略的用例。但是,routeStream 会更新,不会固定在一个文件中。问题仍然存在:有没有办法拥有两个流,其中一个包含另一个流的更改控制数据?
既然我解决了这个问题,不妨在这里写一个答案:)
我这样输入了两个流:
- RoutingTable 流是使用网络路由的第一个字节作为键控的
- IP 地址也由地址的第一个字节键入
这在 IP 数据包通常使用相同的 /8 前缀在网络中路由的情况下有效,对于大多数流量都可以假设。
然后,通过有状态 RichCoFlatMap
可以建立路由 table 状态作为键。当收到一个新的 IP 包时,在路由 table 中进行查找。现在有两种可能的情况:
- 没有找到匹配的路线。我们可以将包存储在此处以备后用,但也可以丢弃它。
- 如果找到路由,则输出[IPAddress, RoutingTableEntry]的元组。
这样,我们就有了两个流,其中一个流具有更改另一个流的控制数据。
我有一个问题是这个问题的变体:
我有两个流:
val ipStream: DataStream[IPAddress] = ???
val routeStream: DataStream[RoutingTable] = ???
我想知道哪个包使用了哪条路由。通常这可以通过以下方式完成:
val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"
这里的问题是我无法在此处真正为流设置密钥,因为这需要完整的 table 以及 ip 地址(并且密钥必须单独计算)。
对于 ipStream
中的每个元素,我需要最新的 routeStream
元素。现在我正在使用一个 hack,所有这些都是非并行处理的:
ipStream
.connect(routeStream)
.keyBy(_ => 0, _ => 0)
.flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]
这听起来像是广播策略的用例。但是,routeStream 会更新,不会固定在一个文件中。问题仍然存在:有没有办法拥有两个流,其中一个包含另一个流的更改控制数据?
既然我解决了这个问题,不妨在这里写一个答案:)
我这样输入了两个流:
- RoutingTable 流是使用网络路由的第一个字节作为键控的
- IP 地址也由地址的第一个字节键入
这在 IP 数据包通常使用相同的 /8 前缀在网络中路由的情况下有效,对于大多数流量都可以假设。
然后,通过有状态 RichCoFlatMap
可以建立路由 table 状态作为键。当收到一个新的 IP 包时,在路由 table 中进行查找。现在有两种可能的情况:
- 没有找到匹配的路线。我们可以将包存储在此处以备后用,但也可以丢弃它。
- 如果找到路由,则输出[IPAddress, RoutingTableEntry]的元组。
这样,我们就有了两个流,其中一个流具有更改另一个流的控制数据。