计算总数并在 flink 中定期发出
Calculate totals and emit periodically in flink
我有一个关于资源的事件流,如下所示:
id, type, count
1, view, 1
1, download, 3
2, view, 1
3, view, 1
1, download, 2
3, view, 1
我正在尝试为每个资源生成统计信息(总计),因此如果我得到如上所示的流,结果应该是:
id, views, downloads
1, 1, 5
2, 1, 0
3, 2, 0
现在我写了一个 ProcessFunction 来计算这样的总数:
public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
private ValueState<ResourceTotals> totalsState;
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
totalsDescriptor.setQueryable("resource-totals");
totalsState = getRuntimeContext().getState(totalsDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
ResourceTotals totals = totalsState.value();
if (totals == null) {
totals = new ResourceTotals();
totals.id = event.id;
}
switch (event.type) {
case "view":
totals.views += event.count;
break;
case "download":
totals.downloads += event.count;
}
totalsState.update(totals);
out.collect(totals);
}
}
从代码中可以明显看出,它会为每个事件发出一个新的 ResourceTotals,但我想每分钟一次而不是更频繁地发出每个资源的总数。
我尝试使用全局 window 和触发器 (ContinuousProcessingTimeTrigger) 进行试验,但无法使其正常工作。我遇到的问题是:
- 如何表达我想要window的最后一个事件?
- 如何不最终存储在该全局 window 中产生的所有 ResourceTotal?
如有任何帮助,我们将不胜感激。
您可以使用计时器每分钟发出一次 totalsState 中的值。由于我在您的数据流中看不到任何时间戳,我想您会使用处理时间计时器。
另一种方法是将 ProcessFunction 替换为 TimeWindow 以及保留最后一个事件的 ReduceFunction。
在任何一种情况下,您都可以考虑通过 ID 和类型字段对流进行键控,这应该会稍微简化您的状态管理。
已更新:
是的,定时器是状态的一部分,由 Flink 设置检查点和恢复状态。
我有一个关于资源的事件流,如下所示:
id, type, count
1, view, 1
1, download, 3
2, view, 1
3, view, 1
1, download, 2
3, view, 1
我正在尝试为每个资源生成统计信息(总计),因此如果我得到如上所示的流,结果应该是:
id, views, downloads
1, 1, 5
2, 1, 0
3, 2, 0
现在我写了一个 ProcessFunction 来计算这样的总数:
public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
private ValueState<ResourceTotals> totalsState;
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
totalsDescriptor.setQueryable("resource-totals");
totalsState = getRuntimeContext().getState(totalsDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
ResourceTotals totals = totalsState.value();
if (totals == null) {
totals = new ResourceTotals();
totals.id = event.id;
}
switch (event.type) {
case "view":
totals.views += event.count;
break;
case "download":
totals.downloads += event.count;
}
totalsState.update(totals);
out.collect(totals);
}
}
从代码中可以明显看出,它会为每个事件发出一个新的 ResourceTotals,但我想每分钟一次而不是更频繁地发出每个资源的总数。
我尝试使用全局 window 和触发器 (ContinuousProcessingTimeTrigger) 进行试验,但无法使其正常工作。我遇到的问题是:
- 如何表达我想要window的最后一个事件?
- 如何不最终存储在该全局 window 中产生的所有 ResourceTotal?
如有任何帮助,我们将不胜感激。
您可以使用计时器每分钟发出一次 totalsState 中的值。由于我在您的数据流中看不到任何时间戳,我想您会使用处理时间计时器。
另一种方法是将 ProcessFunction 替换为 TimeWindow 以及保留最后一个事件的 ReduceFunction。
在任何一种情况下,您都可以考虑通过 ID 和类型字段对流进行键控,这应该会稍微简化您的状态管理。
已更新:
是的,定时器是状态的一部分,由 Flink 设置检查点和恢复状态。