Apache Flink 中带有 TumblingWindow 的水印
Watermark with TumblingWindow in Apache Flink
我试图了解 Windows 和 Apache FLink 中 Watermark 生成之间的依赖关系,我在下面的示例中遇到错误:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(10000);
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("watermarkFlink", new SimpleStringSchema(), props);
DataStream<String> orderStream = env.addSource(kafkaSource);
DataStream<Order> dataStream = orderStream.map(str -> {
String[] order = str.split(",");
return new Order(order[0], Long.parseLong(order[1]), null);
});
WatermarkStrategy<Order> orderWatermarkStrategy = CustomWatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((element, timestamp) ->
element.getTimestamp()
);
dataStream
.assignTimestampsAndWatermarks(orderWatermarkStrategy)
.map(new OrderKeyValue())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> src) throws Exception {
return src.f0;
}
})
.window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
.sum(1)
.print("Windows");
dataStream.print("Data");
env.execute();
}
public static class OrderKeyValue implements MapFunction<Order, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Order order) {
return new Tuple2<>(order.getCategory(), 1);
}
}
这里的时间戳很长,我们可以从 Kafka 源中检索它应该像:A,4 C,8 其中 C 是类别,5 是时间戳。
每当我发送事件时,数据流都会打印,但不会使用 window (print("Windows")) 打印这些事件。
另外,例如,如果我收到一个事件 A,12,然后我生成了一个水印(在 10 秒内),那么我有 C,2,它是在第一个 windows 关闭之后出现的,它会在 window 还是会被忽略?
Flink 文档中有一个教程应该有助于阐明这些概念:https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/streaming_analytics/
但总结一下情况:
如果你有像 (A,4) (C,8) (A,12) 这样的事件流,那么这些整数将被解释为毫秒。
你先window会在触发前等待20000的水印。
要生成这么大的水印,您需要一个时间戳至少为 21000 的事件(因为有界乱序设置为 1 秒)。
并且由于您已将自动加水印间隔配置为 10 秒,因此您的应用程序必须 运行 在生成第一个水印之前这么长时间。 (我想不出任何情况下设置这么大的水印间隔会有帮助。)
如果事件在其 window 关闭后到达,则它将被忽略(默认情况下)。您可以配置 allowed lateness 来安排延迟事件以触发额外的 window 触发。
我试图了解 Windows 和 Apache FLink 中 Watermark 生成之间的依赖关系,我在下面的示例中遇到错误:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(10000);
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("watermarkFlink", new SimpleStringSchema(), props);
DataStream<String> orderStream = env.addSource(kafkaSource);
DataStream<Order> dataStream = orderStream.map(str -> {
String[] order = str.split(",");
return new Order(order[0], Long.parseLong(order[1]), null);
});
WatermarkStrategy<Order> orderWatermarkStrategy = CustomWatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((element, timestamp) ->
element.getTimestamp()
);
dataStream
.assignTimestampsAndWatermarks(orderWatermarkStrategy)
.map(new OrderKeyValue())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> src) throws Exception {
return src.f0;
}
})
.window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
.sum(1)
.print("Windows");
dataStream.print("Data");
env.execute();
}
public static class OrderKeyValue implements MapFunction<Order, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Order order) {
return new Tuple2<>(order.getCategory(), 1);
}
}
这里的时间戳很长,我们可以从 Kafka 源中检索它应该像:A,4 C,8 其中 C 是类别,5 是时间戳。
每当我发送事件时,数据流都会打印,但不会使用 window (print("Windows")) 打印这些事件。 另外,例如,如果我收到一个事件 A,12,然后我生成了一个水印(在 10 秒内),那么我有 C,2,它是在第一个 windows 关闭之后出现的,它会在 window 还是会被忽略?
Flink 文档中有一个教程应该有助于阐明这些概念:https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/streaming_analytics/
但总结一下情况:
如果你有像 (A,4) (C,8) (A,12) 这样的事件流,那么这些整数将被解释为毫秒。
你先window会在触发前等待20000的水印。
要生成这么大的水印,您需要一个时间戳至少为 21000 的事件(因为有界乱序设置为 1 秒)。
并且由于您已将自动加水印间隔配置为 10 秒,因此您的应用程序必须 运行 在生成第一个水印之前这么长时间。 (我想不出任何情况下设置这么大的水印间隔会有帮助。)
如果事件在其 window 关闭后到达,则它将被忽略(默认情况下)。您可以配置 allowed lateness 来安排延迟事件以触发额外的 window 触发。