Flink DataStream排序程序不输出
Flink DataStream sort program does not output
我在 Flink 中编写了一个小测试用例代码来对数据流进行排序。代码如下:
public enum StreamSortTest {
;
public static class MyProcessWindowFunction extends ProcessWindowFunction<Long,Long,Integer, TimeWindow> {
@Override
public void process(Integer key, Context ctx, Iterable<Long> input, Collector<Long> out) {
List<Long> sortedList = new ArrayList<>();
for(Long i: input){
sortedList.add(i);
}
Collections.sort(sortedList);
sortedList.forEach(l -> out.collect(l));
}
}
public static void main(final String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
DataStream<Long> probeSource = env.fromSequence(1, 500).setParallelism(2);
// range partition the stream into two parts based on data value
DataStream<Long> sortOutput =
probeSource
.keyBy(x->{
if(x<250){
return 1;
} else {
return 2;
}
})
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new MyProcessWindowFunction())
;
sortOutput.print();
System.out.println(env.getExecutionPlan());
env.executeAsync();
}
}
但是,代码只输出了执行计划和其他几行。但它不输出实际排序的数字。我做错了什么?
我看到的主要问题是您使用的是基于 ProcessingTime
的 window,输入数据非常短,处理时间肯定会短于 20 秒。虽然 Flink 能够检测到输入结束(如果是来自文件或序列的流,如您的情况)并生成 Long.Max
水印,这将关闭所有基于 windows 的打开事件时间并触发所有事件时间基于定时器。对于基于 ProcessingTime
的计算,它不会做同样的事情,因此在您的情况下,您需要断言 Flink 实际上会工作足够长的时间,以便您的 window 关闭或参考自定义 trigger/different 时间特性.
另一件事我不确定,因为我从来没有那么多地使用过它,你是否应该使用 executeAsync
进行本地执行,因为这基本上意味着你不想等待根据文档 here.
的作业结果
我在 Flink 中编写了一个小测试用例代码来对数据流进行排序。代码如下:
public enum StreamSortTest {
;
public static class MyProcessWindowFunction extends ProcessWindowFunction<Long,Long,Integer, TimeWindow> {
@Override
public void process(Integer key, Context ctx, Iterable<Long> input, Collector<Long> out) {
List<Long> sortedList = new ArrayList<>();
for(Long i: input){
sortedList.add(i);
}
Collections.sort(sortedList);
sortedList.forEach(l -> out.collect(l));
}
}
public static void main(final String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
DataStream<Long> probeSource = env.fromSequence(1, 500).setParallelism(2);
// range partition the stream into two parts based on data value
DataStream<Long> sortOutput =
probeSource
.keyBy(x->{
if(x<250){
return 1;
} else {
return 2;
}
})
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new MyProcessWindowFunction())
;
sortOutput.print();
System.out.println(env.getExecutionPlan());
env.executeAsync();
}
}
但是,代码只输出了执行计划和其他几行。但它不输出实际排序的数字。我做错了什么?
我看到的主要问题是您使用的是基于 ProcessingTime
的 window,输入数据非常短,处理时间肯定会短于 20 秒。虽然 Flink 能够检测到输入结束(如果是来自文件或序列的流,如您的情况)并生成 Long.Max
水印,这将关闭所有基于 windows 的打开事件时间并触发所有事件时间基于定时器。对于基于 ProcessingTime
的计算,它不会做同样的事情,因此在您的情况下,您需要断言 Flink 实际上会工作足够长的时间,以便您的 window 关闭或参考自定义 trigger/different 时间特性.
另一件事我不确定,因为我从来没有那么多地使用过它,你是否应该使用 executeAsync
进行本地执行,因为这基本上意味着你不想等待根据文档 here.