"broadcast state" 取消阻止 Flink 的 CEP 库的“动态模式”功能的实现是什么意思?

What does it mean that "broadcast state" unblocks the implementation of the “dynamic patterns” feature for Flink’s CEP library?

从Flink 1.5发布公告中得知,Flink现在支持"broadcast state",并且描述为"broadcast state unblocks the implementation of the “dynamic patterns” feature for Flink’s CEP library.".

这是否意味着目前我们可以使用 "broadcast state" 来实现没有 Flink CEP 的“动态模式”? 我也不知道在有或没有广播状态的情况下为 Flink CEP 实现“动态模式”有什么区别?如果有人可以用代码举例来解释差异,我将不胜感激。

=============

更新以通过 operator broadcast() 使用键控数据流测试广播数据流

在Flink 1.4.2测试后,发现广播数据流(通过老operator broadcast())可以连接keyed datastream,下面是测试代码,我们发现所有的控制流事件都广播到所有运算符实例。 所以看起来旧的 broadcast() 可以实现与新的 "broadcast state" .

相同的功能
public static void ConnectBroadToKeyedStream() throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();
}

private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    }

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) {

        if (blacklist.contains(data_value)) {
            out.collect("skipped " + data_value);
        } else {
            out.collect("passed " + data_value);
        }
    }
}

下面是测试结果

1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement

没有广播状态,两个 Flink 数据流不能以有状态的方式一起处理,除非它们以完全相同的方式键入。广播流可以连接到键控流,但是如果您随后尝试在 RichCoFlatMap 中使用键控状态,例如,这将失败。

经常需要的是能够拥有一个具有动态 "rules" 的流,该流将应用于另一个流上的每个事件,而不管密钥如何。需要有一种新的托管 Flink 状态,可以在其中存储这些规则。使用 broadcast state,现在可以以一种直接的方式完成此操作。

有了这个功能,就可以开始支持 CEP 中的动态模式了。

这是一个代码示例,它实现了 flink 原始的无参数广播方法和 flink 1.5.0 上新引入的广播状态。 https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed

据我了解,广播状态可以不用flink cep实现,就像上面的代码。

原始 DataStreambroadcast 方法将创建 DataStream 而不是 BroadcastConnectedStream。这将是最初的 coGroup 设计方案。在将指标流与广播规则流连接后,我们可以使用 ConnectedStreams 中定义的更多流转换函数。例如 keyBy 函数,这将使 具有相同密钥 的广播流和连接流被 processed 并粘贴 在同一个并行CoProcessFunction。所以 CoProcessFunction 可以有自己的本地存储。除了从 ReadOnlyContext.

访问的地图状态之外,流程函数可以在其字段上具有自定义数据结构

广播状态可以通过broadcast方法和一组MapStateDescriptor来实现,这意味着广播流可以与其他流多次连接。不同的连接 BroadcastConnectedStream 可以通过 process 函数中的唯一 MapStateDescriptor 共享自己的广播状态。

我认为这些是带有参数的广播和广播状态之间的主要区别。