Apache Flink 流 window WordCount

Apache Flink Streaming window WordCount

我有以下代码来计算来自 socketTextStream 的字数。累积字数和时间窗口字数都需要。该程序有一个问题,即 cumulateCounts 始终与窗口计数相同。为什么会出现这个问题?根据窗口计数计算累积计数的正确方法是什么?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .window(Time.of(5, TimeUnit.SECONDS))
            .groupBy(0).sum(1)
            .flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        String word = value.f0;
        Integer delta_count = value.f1;
        Integer count = cumulateCounts.get(word);
        if (count == null)
            count = 0;
        count = count + delta_count;
        cumulateCounts.put(word, count);
        System.out.println("(" + word + "," + count.toString() + ")");
    }
});

你应该首先分组,然后在键控数据流上应用window(你的代码适用于 Flink 0.9.1 但 Flink 0.10.0 中新的 API 是严格的这个):

final DataStream<Tuple2<String, Integer>> counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
        .flatten();

如果您在非键控数据流上应用 window,则单台机器上将只有一个单线程 window 运算符(即没有并行性)来构建 window 在整个流上(在 Flink 0.9.1 中,这个全局 window 可以通过 groupBy() 拆分为子 windows -- 然而,在 Flink 0.10.0 中,这不会再工作)。要计算单词数,您需要为每个不同的键值构建一个 window,即,您首先为每个键值获取一个子流(通过 groupBy())并应用 window 运算符每个子流(因此,您可以为每个子流拥有自己的 window 运算符实例,从而允许并行执行)。

对于全局(累计)计数,您可以简单地应用 groupBy().sum() 构造。首先,流被分成子流(每个键值一个)。其次,计算流的总和。因为流是 not windowed,总和计算(累积)并为每个传入元组更新(更详细地说,总和的初始结果值为零,每个元组的结果更新为 result += tuple.value)。每次调用 sum 后,都会发出新的当前结果。

在您的代码中,您不应使用您的特殊接收器功能,而应按如下方式操作:

counts.groupBy(0).sum(1).print();