kafka 源流上的事件时间 window
Event time window on kafka source streaming
Kafka 服务器中有主题。在程序中,我们将此主题作为流读取并分配事件时间戳。然后对这个流做window操作。但该程序不起作用。调试后,WindowOperator 的processWatermark 方法似乎没有执行。这是我的代码。
DataStream<Tuple2<String, Long>> advertisement = env
.addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).assignTimestamps(timestampExtractor);
advertisement
.keyBy(keySelector)
.window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
.apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
private static final long serialVersionUID = 5151607280638477891L;
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
out.collect(Iterables.size(values));
}
}).print();
为什么会这样?如果我在 "assignTimestamps(timestampExtractor)" 之前添加 "keyBy(keySelector)" 那么程序就可以运行。谁能帮忙解释一下原因?
您受到 Flink 中已知错误的影响:FLINK-3121:Watermark forwarding does not work for sources not producing any data。
问题是 FlinkKafkaConsumer 的 运行(很可能是 CPU 核心的数量(比如 4))然后你有分区 (1)。只有一个 Kafka 消费者发出水印,其他消费者处于空闲状态。
window 运营商不知道这一点,等待所有消费者的水印到达。这就是 windows 永远不会触发的原因。
Kafka 服务器中有主题。在程序中,我们将此主题作为流读取并分配事件时间戳。然后对这个流做window操作。但该程序不起作用。调试后,WindowOperator 的processWatermark 方法似乎没有执行。这是我的代码。
DataStream<Tuple2<String, Long>> advertisement = env
.addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).assignTimestamps(timestampExtractor);
advertisement
.keyBy(keySelector)
.window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
.apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
private static final long serialVersionUID = 5151607280638477891L;
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
out.collect(Iterables.size(values));
}
}).print();
为什么会这样?如果我在 "assignTimestamps(timestampExtractor)" 之前添加 "keyBy(keySelector)" 那么程序就可以运行。谁能帮忙解释一下原因?
您受到 Flink 中已知错误的影响:FLINK-3121:Watermark forwarding does not work for sources not producing any data。
问题是 FlinkKafkaConsumer 的 运行(很可能是 CPU 核心的数量(比如 4))然后你有分区 (1)。只有一个 Kafka 消费者发出水印,其他消费者处于空闲状态。
window 运营商不知道这一点,等待所有消费者的水印到达。这就是 windows 永远不会触发的原因。