Flink 可以生成 aggregated/rolling/accumulated 数据的每小时快照吗?
Can Flink produce hourly snapshots of aggregated/rolling/accumulated data?
流处理的教科书示例是一个带时间戳的字数统计程序。具有以下数据样本
mario 10:00
luigi 10:01
mario 11:00
mario 12:00
我看过的字数统计程序超过:
总数据集
mario 3
luigi 1
一组时间window分区
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1
但是我还没有找到滚动时间的字数统计程序示例 window,即我希望从时间开始时为每个字每小时生成一个字数统计:
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1
Apache Flink 或任何其他流处理库可以做到这一点吗?谢谢!
编辑:
到目前为止,我已经尝试了 David Anderson 方法的一种变体,只是在对数据进行时间采样时更改事件时间的处理时间。它没有像我预期的那样工作。这是代码、示例数据、它提供的结果以及我的后续问题:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setParallelism(1)
.setMaxParallelism(1);
env.setStreamTimeCharacteristic(EventTime);
String fileLocation = "full file path here";
DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);
rawInput.flatMap(parse())
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
@Override
public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
return element.getTimestamp();
}
})
.keyBy(TimestampedWord::getWord)
.process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
private transient ValueState<Long> count;
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
}
@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0L);
}
long l = ((value.getTimestamp() / 10) + 1) * 10;
ctx.timerService().registerEventTimeTimer(l);
count.update(count.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
}
})
.addSink(new PrintlnSink());
env.execute();
}
private static long fileCounter = 0;
private static FlatMapFunction<String, TimestampedWord> parse() {
return new FlatMapFunction<String, TimestampedWord>() {
@Override
public void flatMap(String value, Collector<TimestampedWord> out) {
out.collect(new TimestampedWord(value, fileCounter++));
}
};
}
private static class TimestampedWord {
private final String word;
private final long timestamp;
private TimestampedWord(String word, long timestamp) {
this.word = word;
this.timestamp = timestamp;
}
public String getWord() {
return word;
}
public long getTimestamp() {
return timestamp;
}
}
private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
@Override
public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
}
}
文件中包含以下文字,每行换行:
mario,luigi,mario,mario,vilma,fred,bob,bob,mario,dan,dylan,dylan,fred,mario,mario,carl,bambam,summer,anna,anna,edu,anna,anna ,安娜,安娜,安娜
产生以下输出:
mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807
明显有问题。尽管 anna
这个词的第三个实例直到位置 22 才出现,但我在 20 处得到 anna
的计数为 3。奇怪的是 edu
只出现在最后一个快照,即使它出现在 anna
的第三次实例之前。即使没有消息到达(即应该生成相同的数据),我如何触发每 10 "units of time" 生成一次快照?
如果有人能指出正确的方向,我将不胜感激!
是的,这不仅可以用 Flink 做到,而且很容易。您可以使用 KeyedProcessFunction 来执行此操作,该函数将计数器保持在键控状态,以计算每个 word/key 目前在输入流中出现的次数。然后使用计时器触发报告。
这是一个使用处理时间计时器的示例。它每 10 秒打印一次报告。
public class DSExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
.keyBy(x -> x)
.process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
}
@Override
public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
if (counter.value() == null) {
counter.update(0);
long now = context.timerService().currentProcessingTime();
context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
}
counter.update(counter.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
long now = context.timerService().currentProcessingTime();
context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
}
})
.print();
env.execute();
}
}
已更新:
使用事件时间总是更好,但这确实增加了复杂性。大多数增加的复杂性源于这样一个事实,即在实际应用程序中,您很可能不得不处理乱序事件——您在示例中已经避免了这种情况,因此在这种情况下,我们可以通过相当简单的方式逃脱实施。
如果你改变两件事,你会得到你期望的结果。首先,将 Watermarks 设置为 extractedTimestamp - 1
是结果错误的原因(例如,这就是为什么 anna=3 at 20)。如果您将水印设置为 extractedTimestamp
,这个问题就会消失。
解释:正是第三个安娜的到来创建了在时间 20 关闭 window 的水印。第三个安娜的时间戳为 21,因此在流中紧随其后的是一个20 处的水印,关闭第二个 window 并生成报告说 anna=3。是的,第一个 edu 到达得更早,但它是第一个 edu,时间戳为 20。在 edu 到达时,没有为 edu 设置计时器,并且创建的计时器正确设置为在 30 时触发,所以我们在至少 30 的水印到来之前不要听说 edu。
另一个问题是定时器逻辑。 Flink 为每个键创建一个单独的计时器,每次计时器触发时您都需要创建一个新计时器。否则,您将只会收到有关 window 期间到达的单词的报告。您应该将代码修改为更像这样:
@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0L);
setTimer(ctx.timerService(), value.getTimestamp());
}
count.update(count.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
if (currentWatermark < Long.MAX_VALUE) {
setTimer(ctx.timerService(), currentWatermark);
}
}
private void setTimer(TimerService service, long t) {
service.registerEventTimeTimer(((t / 10) + 1) * 10);
}
通过这些更改,我得到了这些结果:
mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807
现在,如果您需要实际处理乱序事件,这会变得相当复杂。有必要让水印滞后于时间戳一些实际的量,以反映流中存在的实际乱序量,这将需要能够处理超过一个 window 打开一个时间。任何给定的 event/word 可能不属于接下来将关闭的 window,因此不应增加其计数器。例如,您可以将这些 "early" 事件缓冲在另一个状态(例如 ListState)中,或者以某种方式维护多个计数器(可能在 MapState 中)。此外,某些事件可能会延迟,从而使早期报告无效,您需要定义一些策略来处理该事件。
流处理的教科书示例是一个带时间戳的字数统计程序。具有以下数据样本
mario 10:00
luigi 10:01
mario 11:00
mario 12:00
我看过的字数统计程序超过:
总数据集
mario 3
luigi 1
一组时间window分区
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1
但是我还没有找到滚动时间的字数统计程序示例 window,即我希望从时间开始时为每个字每小时生成一个字数统计:
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1
Apache Flink 或任何其他流处理库可以做到这一点吗?谢谢!
编辑:
到目前为止,我已经尝试了 David Anderson 方法的一种变体,只是在对数据进行时间采样时更改事件时间的处理时间。它没有像我预期的那样工作。这是代码、示例数据、它提供的结果以及我的后续问题:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setParallelism(1)
.setMaxParallelism(1);
env.setStreamTimeCharacteristic(EventTime);
String fileLocation = "full file path here";
DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);
rawInput.flatMap(parse())
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
@Override
public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
return element.getTimestamp();
}
})
.keyBy(TimestampedWord::getWord)
.process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
private transient ValueState<Long> count;
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
}
@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0L);
}
long l = ((value.getTimestamp() / 10) + 1) * 10;
ctx.timerService().registerEventTimeTimer(l);
count.update(count.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
}
})
.addSink(new PrintlnSink());
env.execute();
}
private static long fileCounter = 0;
private static FlatMapFunction<String, TimestampedWord> parse() {
return new FlatMapFunction<String, TimestampedWord>() {
@Override
public void flatMap(String value, Collector<TimestampedWord> out) {
out.collect(new TimestampedWord(value, fileCounter++));
}
};
}
private static class TimestampedWord {
private final String word;
private final long timestamp;
private TimestampedWord(String word, long timestamp) {
this.word = word;
this.timestamp = timestamp;
}
public String getWord() {
return word;
}
public long getTimestamp() {
return timestamp;
}
}
private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
@Override
public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
}
}
文件中包含以下文字,每行换行:
mario,luigi,mario,mario,vilma,fred,bob,bob,mario,dan,dylan,dylan,fred,mario,mario,carl,bambam,summer,anna,anna,edu,anna,anna ,安娜,安娜,安娜
产生以下输出:
mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807
明显有问题。尽管 anna
这个词的第三个实例直到位置 22 才出现,但我在 20 处得到 anna
的计数为 3。奇怪的是 edu
只出现在最后一个快照,即使它出现在 anna
的第三次实例之前。即使没有消息到达(即应该生成相同的数据),我如何触发每 10 "units of time" 生成一次快照?
如果有人能指出正确的方向,我将不胜感激!
是的,这不仅可以用 Flink 做到,而且很容易。您可以使用 KeyedProcessFunction 来执行此操作,该函数将计数器保持在键控状态,以计算每个 word/key 目前在输入流中出现的次数。然后使用计时器触发报告。
这是一个使用处理时间计时器的示例。它每 10 秒打印一次报告。
public class DSExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
.keyBy(x -> x)
.process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
}
@Override
public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
if (counter.value() == null) {
counter.update(0);
long now = context.timerService().currentProcessingTime();
context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
}
counter.update(counter.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
long now = context.timerService().currentProcessingTime();
context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
}
})
.print();
env.execute();
}
}
已更新:
使用事件时间总是更好,但这确实增加了复杂性。大多数增加的复杂性源于这样一个事实,即在实际应用程序中,您很可能不得不处理乱序事件——您在示例中已经避免了这种情况,因此在这种情况下,我们可以通过相当简单的方式逃脱实施。
如果你改变两件事,你会得到你期望的结果。首先,将 Watermarks 设置为 extractedTimestamp - 1
是结果错误的原因(例如,这就是为什么 anna=3 at 20)。如果您将水印设置为 extractedTimestamp
,这个问题就会消失。
解释:正是第三个安娜的到来创建了在时间 20 关闭 window 的水印。第三个安娜的时间戳为 21,因此在流中紧随其后的是一个20 处的水印,关闭第二个 window 并生成报告说 anna=3。是的,第一个 edu 到达得更早,但它是第一个 edu,时间戳为 20。在 edu 到达时,没有为 edu 设置计时器,并且创建的计时器正确设置为在 30 时触发,所以我们在至少 30 的水印到来之前不要听说 edu。
另一个问题是定时器逻辑。 Flink 为每个键创建一个单独的计时器,每次计时器触发时您都需要创建一个新计时器。否则,您将只会收到有关 window 期间到达的单词的报告。您应该将代码修改为更像这样:
@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0L);
setTimer(ctx.timerService(), value.getTimestamp());
}
count.update(count.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
if (currentWatermark < Long.MAX_VALUE) {
setTimer(ctx.timerService(), currentWatermark);
}
}
private void setTimer(TimerService service, long t) {
service.registerEventTimeTimer(((t / 10) + 1) * 10);
}
通过这些更改,我得到了这些结果:
mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807
现在,如果您需要实际处理乱序事件,这会变得相当复杂。有必要让水印滞后于时间戳一些实际的量,以反映流中存在的实际乱序量,这将需要能够处理超过一个 window 打开一个时间。任何给定的 event/word 可能不属于接下来将关闭的 window,因此不应增加其计数器。例如,您可以将这些 "early" 事件缓冲在另一个状态(例如 ListState)中,或者以某种方式维护多个计数器(可能在 MapState 中)。此外,某些事件可能会延迟,从而使早期报告无效,您需要定义一些策略来处理该事件。