我可以在不使用内置 print() 函数的情况下在 Apache Flink 中打印 DataSteam<T> 的各个元素吗
Can I print Individual elements of DataSteam<T> in Apache Flink without using inbuilt print() function
我正在尝试打印在 Flink 中检测到的警告值
// 为每个匹配的警告模式生成温度警告
DataStream<TemperatureEvent> warnings = tempPatternStream.select(
(Map<String, MonitoringEvent> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first");
return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
}
);
// Print the warning and alert events to stdout
warnings.print();
我得到如下输出(根据 eventSource 函数的 toString)
Rack id = 99 and temprature = 76.0
有人能告诉我,有什么方法可以不使用 print 打印 DataStream 的值吗?一个例子是,如果我只想打印温度,我如何访问 DataStream 中的各个元素。
提前致谢
我找到了访问单个元素的方法,假设我们有一个 DataStream
HeartRate<Integer,Integer>
它有 2 个属性
private Integer Patient_id ;
private Integer HR;
// 使用自定义函数生成数据流
DataStream<HREvent> hrEventDataStream = envrionment
.addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
假设您已经使用自定义函数生成了数据流,现在我们可以打印 HeartRateEvent 的各个元素的值,如下所示
hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
@Override
public Integer getKey(HREvent hrEvent) throws Exception {
return hrEvent.getPatient_id();
}
})
.window(TumblingEventTimeWindows.of(milliseconds(10)))
.apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {
for(HREvent in : iterable){
System.out.println("Patient id = " + in.getPatient_id() + " Heart Rate = " + in.getHR());
}//for
}//apply
});
希望对您有所帮助!
我正在尝试打印在 Flink 中检测到的警告值
// 为每个匹配的警告模式生成温度警告
DataStream<TemperatureEvent> warnings = tempPatternStream.select(
(Map<String, MonitoringEvent> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first");
return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
}
);
// Print the warning and alert events to stdout
warnings.print();
我得到如下输出(根据 eventSource 函数的 toString)
Rack id = 99 and temprature = 76.0
有人能告诉我,有什么方法可以不使用 print 打印 DataStream 的值吗?一个例子是,如果我只想打印温度,我如何访问 DataStream 中的各个元素。
提前致谢
我找到了访问单个元素的方法,假设我们有一个 DataStream
HeartRate<Integer,Integer>
它有 2 个属性
private Integer Patient_id ;
private Integer HR;
// 使用自定义函数生成数据流
DataStream<HREvent> hrEventDataStream = envrionment
.addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
假设您已经使用自定义函数生成了数据流,现在我们可以打印 HeartRateEvent 的各个元素的值,如下所示
hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
@Override
public Integer getKey(HREvent hrEvent) throws Exception {
return hrEvent.getPatient_id();
}
})
.window(TumblingEventTimeWindows.of(milliseconds(10)))
.apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {
for(HREvent in : iterable){
System.out.println("Patient id = " + in.getPatient_id() + " Heart Rate = " + in.getHR());
}//for
}//apply
});
希望对您有所帮助!