flink 计数不同的问题
flink count distinct issue
现在我们使用翻滚 window 来计算不同的值。我们遇到的问题是,如果我们将翻滚 window 从一天延长到一个月,我们就无法获得现在的不同计数。这意味着如果我们将翻滚 window 设置为 1 个月,我们得到的数字是从每个月的第 1 天开始的。我现在如何获得当前的非重复计数(现在是 3 月 9 日)?
package flink.trigger;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.text.SimpleDateFormat;
import java.util.Date;
public class CustomCountDistinctTigger extends Trigger<Object, TimeWindow> {
private final ReducingStateDescriptor<Long> timeState =
new ReducingStateDescriptor<>("fire-interval", new DistinctCountAggregateFunction(), LongSerializer.INSTANCE);
private long interval;
public CustomCountDistinctTigger(long interval) {
this.interval = interval;
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
// System.out.println("onProcessingTime called at "+System.currentTimeMillis() );
// return TriggerResult.FIRE_AND_PURGE;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(df.format(new Date()));
//interval
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);
if(window.maxTimestamp() == time) {
return TriggerResult.FIRE_AND_PURGE;
}
else if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
distinct count:
DataStreamSink<Tuple2<String, Integer>> finalResultStream = keyedStream
.flatMap(new KPIDistinctDataFlatMapFunction(inputSchema))
.map(new SwapMap())
.keyBy(new WordKeySelector())
.window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new CustomCountDistinctTigger(1 * 60 * 6000))
.aggregate(new DistinctCountAggregateFunction())
.print("final print");
您可以定义一个自定义触发器 returns 每天触发一次以触发中间结果,然后在月底执行 FIRE_AND_PURGE 以关闭 window。
每次触发 returns FIRE 您的 window 都会通过调用 ProcessWindowFunction
的 process()
方法进行评估,此时它可以使用 Collector
即提供。 FIRE_AND_PURGE 最后一次评估 window,然后销毁它。
另请参阅此问题的答案 - - 其中涵盖了相关主题。
现在我们使用翻滚 window 来计算不同的值。我们遇到的问题是,如果我们将翻滚 window 从一天延长到一个月,我们就无法获得现在的不同计数。这意味着如果我们将翻滚 window 设置为 1 个月,我们得到的数字是从每个月的第 1 天开始的。我现在如何获得当前的非重复计数(现在是 3 月 9 日)?
package flink.trigger;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.text.SimpleDateFormat;
import java.util.Date;
public class CustomCountDistinctTigger extends Trigger<Object, TimeWindow> {
private final ReducingStateDescriptor<Long> timeState =
new ReducingStateDescriptor<>("fire-interval", new DistinctCountAggregateFunction(), LongSerializer.INSTANCE);
private long interval;
public CustomCountDistinctTigger(long interval) {
this.interval = interval;
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
// System.out.println("onProcessingTime called at "+System.currentTimeMillis() );
// return TriggerResult.FIRE_AND_PURGE;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(df.format(new Date()));
//interval
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);
if(window.maxTimestamp() == time) {
return TriggerResult.FIRE_AND_PURGE;
}
else if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
distinct count:
DataStreamSink<Tuple2<String, Integer>> finalResultStream = keyedStream
.flatMap(new KPIDistinctDataFlatMapFunction(inputSchema))
.map(new SwapMap())
.keyBy(new WordKeySelector())
.window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new CustomCountDistinctTigger(1 * 60 * 6000))
.aggregate(new DistinctCountAggregateFunction())
.print("final print");
您可以定义一个自定义触发器 returns 每天触发一次以触发中间结果,然后在月底执行 FIRE_AND_PURGE 以关闭 window。
每次触发 returns FIRE 您的 window 都会通过调用 ProcessWindowFunction
的 process()
方法进行评估,此时它可以使用 Collector
即提供。 FIRE_AND_PURGE 最后一次评估 window,然后销毁它。
另请参阅此问题的答案 -