warnings.print() 以相反的顺序打印事件(最后一个事件在前)除了 Apache Flink CEP 中的第一个事件
warnings.print() prints events in reverse order(last event first ) excepts first event in Apache Flink CEP
我正在尝试使用以下模式过滤 Flink 中大于 10 的所有临时事件,
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
.subtype(TemperatureEvent.class)
.where(new FilterFunction<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent temperatureEvent) throws Exception {
return temperatureEvent.getTemperature() > 50;
}
});
输入是文本文件,通过输入函数解析为流,输入文件内容为:-
1,98
2,33
3,44
4,55
5,66
6,88
7,99
8,76
这里的第一个值是 Rack_id,第二个是温度
我已经在输入流和 WarnigsStream 上发出了 print() ,如下所示
inputEventStream.print();
warnings.print();
问题来了,Flink CEP的输出如下图
08/10/2017 23:43:15 Job execution switched to status RUNNING.
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
Rack id = 1 and temprature = 98.0)
Rack id = 2 and temprature = 33.0)
Rack id = 3 and temprature = 44.0)
Rack id = 4 and temprature = 55.0)
Rack id = 5 and temprature = 66.0)
Rack id = 6 and temprature = 88.0)
Rack id = 7 and temprature = 99.0)
Rack id = 8 and temprature = 76.0)
08/10/2017 23:43:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to FINISHED
Rack id = 1 and temprature = 98.0)
Rack id = 8 and temprature = 76.0)
Rack id = 7 and temprature = 99.0)
Rack id = 6 and temprature = 88.0)
Rack id = 5 and temprature = 66.0)
Rack id = 4 and temprature = 55.0)
08/10/2017 23:43:16 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED
08/10/2017 23:43:16 Job execution switched to status FINISHED.
Process finished with exit code 0
正如我们所见,第一个复杂事件(Rack id = 1 and temperature = 98.0))以相同的顺序打印,但在此之后,所有其他具有 temp > 50 的复杂事件都以相反的顺序打印关于输入流。
My questions are :-
1. Any idea why events are getting printed in reverse order?
2. Is there a custom way to print values{w/o using warnings.print()} of
warning stream, like can I print only temperature, rather than rack-id ?
提前致谢
此问题已通过将时间戳和水印分配给 inputStream 解决,如下所示
// Input stream of monitoring events
DataStream<MonitoringEvent> inputEventStream = env
.addSource(new InputStreamAGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
生成的输出如下所示
08/11/2017 00:45:09 Job execution switched to status RUNNING.
08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to SCHEDULED
08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to DEPLOYING
08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to RUNNING
08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING
Rack id = 1 and temprature = 98.0)
Rack id = 4 and temprature = 55.0)
Rack id = 5 and temprature = 66.0)
Rack id = 6 and temprature = 88.0)
Rack id = 7 and temprature = 99.0)
Rack id = 8 and temprature = 76.0)
08/11/2017 00:45:10 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to FINISHED
08/11/2017 00:45:10 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED
08/11/2017 00:45:10 Job execution switched to status FINISHED.
我正在尝试使用以下模式过滤 Flink 中大于 10 的所有临时事件,
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
.subtype(TemperatureEvent.class)
.where(new FilterFunction<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent temperatureEvent) throws Exception {
return temperatureEvent.getTemperature() > 50;
}
});
输入是文本文件,通过输入函数解析为流,输入文件内容为:-
1,98
2,33
3,44
4,55
5,66
6,88
7,99
8,76
这里的第一个值是 Rack_id,第二个是温度
我已经在输入流和 WarnigsStream 上发出了 print() ,如下所示
inputEventStream.print();
warnings.print();
问题来了,Flink CEP的输出如下图
08/10/2017 23:43:15 Job execution switched to status RUNNING.
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
Rack id = 1 and temprature = 98.0)
Rack id = 2 and temprature = 33.0)
Rack id = 3 and temprature = 44.0)
Rack id = 4 and temprature = 55.0)
Rack id = 5 and temprature = 66.0)
Rack id = 6 and temprature = 88.0)
Rack id = 7 and temprature = 99.0)
Rack id = 8 and temprature = 76.0)
08/10/2017 23:43:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to FINISHED
Rack id = 1 and temprature = 98.0)
Rack id = 8 and temprature = 76.0)
Rack id = 7 and temprature = 99.0)
Rack id = 6 and temprature = 88.0)
Rack id = 5 and temprature = 66.0)
Rack id = 4 and temprature = 55.0)
08/10/2017 23:43:16 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED
08/10/2017 23:43:16 Job execution switched to status FINISHED.
Process finished with exit code 0
正如我们所见,第一个复杂事件(Rack id = 1 and temperature = 98.0))以相同的顺序打印,但在此之后,所有其他具有 temp > 50 的复杂事件都以相反的顺序打印关于输入流。
My questions are :-
1. Any idea why events are getting printed in reverse order?
2. Is there a custom way to print values{w/o using warnings.print()} of
warning stream, like can I print only temperature, rather than rack-id ?
提前致谢
此问题已通过将时间戳和水印分配给 inputStream 解决,如下所示
// Input stream of monitoring events
DataStream<MonitoringEvent> inputEventStream = env
.addSource(new InputStreamAGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
生成的输出如下所示
08/11/2017 00:45:09 Job execution switched to status RUNNING.
08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to SCHEDULED
08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to DEPLOYING
08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to RUNNING
08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING
Rack id = 1 and temprature = 98.0)
Rack id = 4 and temprature = 55.0)
Rack id = 5 and temprature = 66.0)
Rack id = 6 and temprature = 88.0)
Rack id = 7 and temprature = 99.0)
Rack id = 8 and temprature = 76.0)
08/11/2017 00:45:10 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to FINISHED
08/11/2017 00:45:10 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED
08/11/2017 00:45:10 Job execution switched to status FINISHED.