翻滚后执行flink sink window
Execute flink sink after tumbling window
来源:Kinesis 数据流
接收器:弹性搜索
两者都使用 AWS 服务。
此外,运行 我在 AWS Kinesis 数据分析应用程序上的 Flink 作业
我遇到了 flink 的 windowing 功能问题。我的工作看起来像这样
DataStream<TrackingData> input = ...; // input from kinesis stream
input.keyBy(e -> e.getArea())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction(), new MyProcessWindowFunction())
.addSink(<elasticsearch sink>);
private static class MyReduceFunction implements ReduceFunction<TrackingData> {
@Override
public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
return trackingData;
}
}
private static class MyProcessWindowFunction extends ProcessWindowFunction<TrackingData, TrackingData, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<TrackingData> in,
Collector<TrackingData> out) {
TrackingData trackingIn = in.iterator().next();
Long videoDuration =0l;
for (TrackingData t: in) {
videoDuration += t.getVideoDuration();
}
trackingIn.setVideoDuration(videoDuration);
out.collect(trackingIn);
}
}
示例事件:
{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5}
我在这里所做的是从运动流中获取大量这些事件我想对 window 的每 10 秒求和 videoDuration
然后我想存储这个 单个事件进入elasticsearch。
在 Kinesis 中,每秒可以有 10,000 个事件。我不想在 elasticsearch 中存储所有 10,000 个事件我只想每 10 秒存储一个事件。
问题是当我向这个作业发送一个事件时,它会快速处理这个事件并直接进入 elasticsearch 但我想实现:直到每 10 秒我希望事件 videoDuration
时间递增,之后10 秒内只有一个事件存储在 elasticearch 中。
我怎样才能做到这一点?
我认为你误诊了问题。
您编写的代码将从每个 10 秒长的 window 中为每个在 window[=26= 期间有事件的不同键生成一个事件]. MyProcessWindowFunction
没有任何效果:由于 window 结果已经预先聚合,每个 Iterable 将只包含一个事件。
我相信你想这样做:
input.keyBy(e -> e.getArea())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction())
.addSink(<elasticsearch sink>);
你也可以这样做
input.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction())
.addSink(<elasticsearch sink>);
但第一个版本会更快,因为它将能够在计算 windowAll.[=13= 中的全局总和之前并行计算每个键的 window 结果。 ]
FWIW,Table/SQL API 通常更适合此类应用程序,并且应该产生比其中任何一个都更优化的管道。
来源:Kinesis 数据流
接收器:弹性搜索
两者都使用 AWS 服务。
此外,运行 我在 AWS Kinesis 数据分析应用程序上的 Flink 作业
我遇到了 flink 的 windowing 功能问题。我的工作看起来像这样
DataStream<TrackingData> input = ...; // input from kinesis stream
input.keyBy(e -> e.getArea())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction(), new MyProcessWindowFunction())
.addSink(<elasticsearch sink>);
private static class MyReduceFunction implements ReduceFunction<TrackingData> {
@Override
public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
return trackingData;
}
}
private static class MyProcessWindowFunction extends ProcessWindowFunction<TrackingData, TrackingData, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<TrackingData> in,
Collector<TrackingData> out) {
TrackingData trackingIn = in.iterator().next();
Long videoDuration =0l;
for (TrackingData t: in) {
videoDuration += t.getVideoDuration();
}
trackingIn.setVideoDuration(videoDuration);
out.collect(trackingIn);
}
}
示例事件:
{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5}
我在这里所做的是从运动流中获取大量这些事件我想对 window 的每 10 秒求和 videoDuration
然后我想存储这个 单个事件进入elasticsearch。
在 Kinesis 中,每秒可以有 10,000 个事件。我不想在 elasticsearch 中存储所有 10,000 个事件我只想每 10 秒存储一个事件。
问题是当我向这个作业发送一个事件时,它会快速处理这个事件并直接进入 elasticsearch 但我想实现:直到每 10 秒我希望事件 videoDuration
时间递增,之后10 秒内只有一个事件存储在 elasticearch 中。
我怎样才能做到这一点?
我认为你误诊了问题。
您编写的代码将从每个 10 秒长的 window 中为每个在 window[=26= 期间有事件的不同键生成一个事件]. MyProcessWindowFunction
没有任何效果:由于 window 结果已经预先聚合,每个 Iterable 将只包含一个事件。
我相信你想这样做:
input.keyBy(e -> e.getArea())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction())
.addSink(<elasticsearch sink>);
你也可以这样做
input.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction())
.addSink(<elasticsearch sink>);
但第一个版本会更快,因为它将能够在计算 windowAll.[=13= 中的全局总和之前并行计算每个键的 window 结果。 ]
FWIW,Table/SQL API 通常更适合此类应用程序,并且应该产生比其中任何一个都更优化的管道。