我如何使用 Flink CEP 检测模式 a+b+

How can i detect pattern a+b+ with Flink CEP

Flink CEP 不适用于我的模式。 我有一个序列,例如 aabbbbaaaabbabb(a+b+)。 我希望函数过程显示这样的输出: {aabbbb} {aaaabb} {abb}

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<JsonNode, JsonNode> attemptPattern = Pattern.<JsonNode>begin("first", skipStrategy)
        .where(new SPCondition() {
            @Override
            public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
                return element.get("endpoint").textvalue().equals("A");
            }
        }).oneOrMore()
        .next("second")
        .where(new SPCondition() {
            @Override
            public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
                return element.get("endpoint").textvalue().equals("B");
            }
        }).oneOrMore();

我的结果:

{aab} {aaaab} {ab}

您需要以某种方式坚持它尽可能地获取所有 B,而不仅仅是在第一个之后匹配。这是一种方法。

public class CEPExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "b", "b", "a", "b", "b", "x");

        AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
        Pattern<String, String> pattern = Pattern.<String>begin("first", skipStrategy)
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return (element.equals("a"));
                    }
                }).oneOrMore()
                .next("second")
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return (element.equals("b"));
                    }
                }).oneOrMore()
                .next("end")
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return (!element.equals("b"));
                    }
                });

        PatternStream<String> patternStream = CEP.pattern(events, pattern);
        patternStream.select(new SelectSegment()).print();
        env.execute();
    }

    public static class SelectSegment implements PatternSelectFunction<String, String> {
        public String select(Map<String, List<String>> pattern) {
            return String.join("", pattern.get("first")) + String.join("", pattern.get("second"));
        }
    }

}

如果你想匹配 a+b*,虽然我觉得应该有一个更简单的解决方案,但这里有一些可行的方法:

public class CEPExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "x");

        AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
        Pattern<String, String> pattern = Pattern.<String>begin("a-or-b", skipStrategy)
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return element.equals("a") || element.equals("b");
                    }
                }).oneOrMore()
                .next("end")
                .where(new IterativeCondition<String>() {
                    @Override
                    public boolean filter(String element, Context<String> ctx) throws Exception {
                        List<String> list = new ArrayList<>();
                        ctx.getEventsForPattern("a-or-b").iterator().forEachRemaining(list::add);
                        int length = list.size();
                        if (!element.equals("a") && !element.equals("b")) return true;
                        return (((length >= 1) && element.equals("a") && list.get(length - 1).equals("b")));
                    }
                });

        PatternStream<String> patternStream = CEP.pattern(events, pattern);
        patternStream.select(new SelectSegment()).print();
        env.execute();
    }

    public static class SelectSegment implements PatternSelectFunction<String, String> {
        public String select(Map<String, List<String>> pattern) {
            return String.join("", pattern.get("a-or-b"));
        }
    }

}

就其价值而言,我通常发现 match_recognize 为 Flink 的模式匹配提供了更直接的 DSL。