文档中对"Contiguity within looping patterns "的描述是否正确?
Is the description of the "Contiguity within looping patterns " in the document correct?
如flink CEP document中的描述:
Strict Contiguity:
期望所有匹配事件严格依次出现,中间没有任何非匹配事件。
Relaxed Contiguity
:忽略匹配事件之间出现的不匹配事件。
Non-Deterministic Relaxed Contiguity
:进一步放宽连续性,允许忽略某些匹配事件的额外匹配。
第一个例子很容易理解:
给定模式:"a b"
和输入"a", "c", "b1", "b2"
- 严格连续输出:
{} (no match)
- 宽松的邻接输出:
{a b1}
- 非确定性松散连续性输出:
{a b1}, {a b2}
但是Contiguity within looping patterns
的例子真的很难理解:
给定模式:"a b+ c"
.
和输入:"a", "b1", "d1", "b2", "d2", "b3" "c"
- 严格连续:{a b3 c}
- 宽松的连续性:{a b1 c}、{a b1 b2 c}、{a b1 b2 b3 c}、{a b2 c}、{a b2 b3 c}、{a b3 c}
- 非确定性松散连续性:{a b1 c}、{a b1 b2 c}、{a b1 b3 c}、{a b1 b2 b3 c}、{a b2 c}、{a b2 b3 c }, {a b3 c}
the Strict Contiguity output {a b3 c}
,但这与Strict Contiguity
中的描述不符,因为a
和b3
之间有很多non-matching events
。
我相信你是对的。对于严格的连续性,它根本不匹配。我写了下面的例子来确保:
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<String, String> pattern =
Pattern.<String>begin("a", skipStrategy)
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("a");
}
})
.next("b+")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("b");
}
})
.oneOrMore().consecutive()
.next("c")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("c");
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern).inProcessingTime();
patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true));
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("a"))
+ String.join("", pattern.get("b+"))
+ String.join("", pattern.get("c"));
}
}
}
我创建了 FLINK-27456 来跟踪这个。
如flink CEP document中的描述:
Strict Contiguity:
期望所有匹配事件严格依次出现,中间没有任何非匹配事件。Relaxed Contiguity
:忽略匹配事件之间出现的不匹配事件。Non-Deterministic Relaxed Contiguity
:进一步放宽连续性,允许忽略某些匹配事件的额外匹配。
第一个例子很容易理解:
给定模式:"a b"
和输入"a", "c", "b1", "b2"
- 严格连续输出:
{} (no match)
- 宽松的邻接输出:
{a b1}
- 非确定性松散连续性输出:
{a b1}, {a b2}
但是Contiguity within looping patterns
的例子真的很难理解:
给定模式:"a b+ c"
.
和输入:"a", "b1", "d1", "b2", "d2", "b3" "c"
- 严格连续:{a b3 c}
- 宽松的连续性:{a b1 c}、{a b1 b2 c}、{a b1 b2 b3 c}、{a b2 c}、{a b2 b3 c}、{a b3 c}
- 非确定性松散连续性:{a b1 c}、{a b1 b2 c}、{a b1 b3 c}、{a b1 b2 b3 c}、{a b2 c}、{a b2 b3 c }, {a b3 c}
the Strict Contiguity output {a b3 c}
,但这与Strict Contiguity
中的描述不符,因为a
和b3
之间有很多non-matching events
。
我相信你是对的。对于严格的连续性,它根本不匹配。我写了下面的例子来确保:
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<String, String> pattern =
Pattern.<String>begin("a", skipStrategy)
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("a");
}
})
.next("b+")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("b");
}
})
.oneOrMore().consecutive()
.next("c")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("c");
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern).inProcessingTime();
patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true));
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("a"))
+ String.join("", pattern.get("b+"))
+ String.join("", pattern.get("c"));
}
}
}
我创建了 FLINK-27456 来跟踪这个。