Apache Flink 流 window WordCount
Apache Flink Streaming window WordCount
我有以下代码来计算来自 socketTextStream 的字数。累积字数和时间窗口字数都需要。该程序有一个问题,即 cumulateCounts 始终与窗口计数相同。为什么会出现这个问题?根据窗口计数计算累积计数的正确方法是什么?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});
你应该首先分组,然后在键控数据流上应用window(你的代码适用于 Flink 0.9.1 但 Flink 0.10.0 中新的 API 是严格的这个):
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.groupBy(0)
.window(Time.of(5, TimeUnit.SECONDS)).sum(1)
.flatten();
如果您在非键控数据流上应用 window,则单台机器上将只有一个单线程 window 运算符(即没有并行性)来构建 window 在整个流上(在 Flink 0.9.1 中,这个全局 window 可以通过 groupBy()
拆分为子 windows -- 然而,在 Flink 0.10.0 中,这不会再工作)。要计算单词数,您需要为每个不同的键值构建一个 window,即,您首先为每个键值获取一个子流(通过 groupBy()
)并应用 window 运算符每个子流(因此,您可以为每个子流拥有自己的 window 运算符实例,从而允许并行执行)。
对于全局(累计)计数,您可以简单地应用 groupBy().sum()
构造。首先,流被分成子流(每个键值一个)。其次,计算流的总和。因为流是 not windowed,总和计算(累积)并为每个传入元组更新(更详细地说,总和的初始结果值为零,每个元组的结果更新为 result += tuple.value
)。每次调用 sum 后,都会发出新的当前结果。
在您的代码中,您不应使用您的特殊接收器功能,而应按如下方式操作:
counts.groupBy(0).sum(1).print();
我有以下代码来计算来自 socketTextStream 的字数。累积字数和时间窗口字数都需要。该程序有一个问题,即 cumulateCounts 始终与窗口计数相同。为什么会出现这个问题?根据窗口计数计算累积计数的正确方法是什么?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});
你应该首先分组,然后在键控数据流上应用window(你的代码适用于 Flink 0.9.1 但 Flink 0.10.0 中新的 API 是严格的这个):
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.groupBy(0)
.window(Time.of(5, TimeUnit.SECONDS)).sum(1)
.flatten();
如果您在非键控数据流上应用 window,则单台机器上将只有一个单线程 window 运算符(即没有并行性)来构建 window 在整个流上(在 Flink 0.9.1 中,这个全局 window 可以通过 groupBy()
拆分为子 windows -- 然而,在 Flink 0.10.0 中,这不会再工作)。要计算单词数,您需要为每个不同的键值构建一个 window,即,您首先为每个键值获取一个子流(通过 groupBy()
)并应用 window 运算符每个子流(因此,您可以为每个子流拥有自己的 window 运算符实例,从而允许并行执行)。
对于全局(累计)计数,您可以简单地应用 groupBy().sum()
构造。首先,流被分成子流(每个键值一个)。其次,计算流的总和。因为流是 not windowed,总和计算(累积)并为每个传入元组更新(更详细地说,总和的初始结果值为零,每个元组的结果更新为 result += tuple.value
)。每次调用 sum 后,都会发出新的当前结果。
在您的代码中,您不应使用您的特殊接收器功能,而应按如下方式操作:
counts.groupBy(0).sum(1).print();