使用 windows 从无限数据流中过滤重复项

Filtering duplicates out of an infinite DataStream with windows

我想从无限数据流中过滤掉 Flink 中的重复项。我知道重复项只会在很短的时间内出现 window(最多 10 秒)。我发现了一种非常简单的有前途的方法 。但它不起作用。它使用键控数据流和 returns 仅每个 window 的第一条消息。 这是我的 window 代码:

DataStream<Row> outputStream = inputStream
                .keyBy(new MyKeySelector())
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.minutes(5)))
                .process(new DuplicateFilter());

MyKeySelector()只是一个class到select的Row消息的前两个属性作为key。此键用作主键并导致只有具有相同键的消息被分配给相同的 window(classic 键控流行为)。

那是 class Duplicate Filter,它与上述问题的建议答案非常相似。我只使用了较新的 process() 函数而不是 apply().

public class DuplicateFilter extends ProcessWindowFunction<Row, Row, Tuple2<String, String>, TimeWindow> {
private static final Logger LOG = LoggerFactory.getLogger(DuplicateFilter.class);

@Override
public void process(Tuple2<String, String> key, Context context, Iterable<Row> iterable, Collector<Row> collector) throws Exception {
    // this is just for debugging and can be ignored
    int count = 0;
    for (Row record :
            iterable) {
        LOG.info("Row number {}: {}", count, record);
        count++;
    }
    LOG.info("first Row: {}", iterable.iterator().next());

    collector.collect(iterable.iterator().next()); //output only the first message in this window
}
}

我的消息以最大间隔到达。一秒钟,所以 30 秒 window 应该可以很好地处理。但是到达距离小于 1 秒的消息被分配到不同的 windows。我从日志中看到的是它很少能正常工作。

有人对此任务有想法或其他方法吗?如果您需要更多信息,请告诉我。

Flink 的时间 windows 是与时钟对齐的,而不是与事件对齐的,因此时间上接近的两个事件可以分配给不同的 windows。 Windows 通常不太适合重复数据删除,但如果您使用会话 windows.

可能会得到很好的结果

就个人而言,我会使用键控平面图(或过程函数),并使用状态 TTL(或计时器)在不再需要时清除键的状态。

你也可以用Flink做去重SQL:https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/sql/queries/deduplication/ (but you would need to set an idle state retention interval).