Flink 复杂事件处理
Flink Complex Event Processing
我有一个从套接字读取并检测模式的 flink cep 代码。假设模式(单词)是 'alert'。如果单词警报出现五次或更多次,则应创建警报。但是我收到输入不匹配错误。 Flink 版本为 1.3.0。提前致谢!!
package pattern;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class cep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);
dss.print();
Pattern<String,String> pattern = Pattern.<String> begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String word, Context<String> context) throws Exception {
return word.equals("alert");
}
})
.times(5);
PatternStream<String> patternstream = CEP.pattern(dss, pattern);
DataStream<String> alerts = patternstream
.flatSelect((Map<String,List<String>> in, Collector<String> out) -> {
String first = in.get("first").get(0);
for (int i = 0; i < 6; i++ ) {
out.collect(first);
}
});
alerts.print();
env.execute();
}
}
所以我已经得到了可以工作的代码。这是工作解决方案,
package pattern;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class cep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);
dss.print();
Pattern<String,String> pattern = Pattern.<String> begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String word, Context<String> context) throws Exception {
return word.equals("alert");
}
})
.times(5);
PatternStream<String> patternstream = CEP.pattern(dss, pattern);
DataStream<String> alerts = patternstream
.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> in) throws Exception {
String first = in.get("first").get(0);
if(first.equals("alert")){
return ("5 or more alerts");
}
else{
return (" ");
}
}
});
alerts.print();
env.execute();
}
}
只是对原来的问题做一些澄清。在 1.3.0 中存在一个错误,导致无法使用 lambda 作为 select/flatSelect
的参数。
它已在 1.3.1 中修复,因此您的第一个版本的代码将适用于 1.3.1。
此外,我认为您误解了 times
量词。它匹配确切的次数。因此,在您的情况下,只有当事件恰好匹配 3 次而不是 3 次或更多时,它才会 return。
我有一个从套接字读取并检测模式的 flink cep 代码。假设模式(单词)是 'alert'。如果单词警报出现五次或更多次,则应创建警报。但是我收到输入不匹配错误。 Flink 版本为 1.3.0。提前致谢!!
package pattern;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class cep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);
dss.print();
Pattern<String,String> pattern = Pattern.<String> begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String word, Context<String> context) throws Exception {
return word.equals("alert");
}
})
.times(5);
PatternStream<String> patternstream = CEP.pattern(dss, pattern);
DataStream<String> alerts = patternstream
.flatSelect((Map<String,List<String>> in, Collector<String> out) -> {
String first = in.get("first").get(0);
for (int i = 0; i < 6; i++ ) {
out.collect(first);
}
});
alerts.print();
env.execute();
}
}
所以我已经得到了可以工作的代码。这是工作解决方案,
package pattern;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class cep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);
dss.print();
Pattern<String,String> pattern = Pattern.<String> begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String word, Context<String> context) throws Exception {
return word.equals("alert");
}
})
.times(5);
PatternStream<String> patternstream = CEP.pattern(dss, pattern);
DataStream<String> alerts = patternstream
.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> in) throws Exception {
String first = in.get("first").get(0);
if(first.equals("alert")){
return ("5 or more alerts");
}
else{
return (" ");
}
}
});
alerts.print();
env.execute();
}
}
只是对原来的问题做一些澄清。在 1.3.0 中存在一个错误,导致无法使用 lambda 作为 select/flatSelect
的参数。
它已在 1.3.1 中修复,因此您的第一个版本的代码将适用于 1.3.1。
此外,我认为您误解了 times
量词。它匹配确切的次数。因此,在您的情况下,只有当事件恰好匹配 3 次而不是 3 次或更多时,它才会 return。