使用 Kafka Streams DSL 的 2 步窗口聚合
2 step windowed aggregation with Kafka Streams DSL
假设我有一个流 "stream-1",每秒包含 1 个数据点,我想计算一个派生流 "stream-5",其中包含使用 5 的跳跃 window 的总和秒和另一个基于 "stream-5" 的流 "stream-10" 包含使用 10 秒的跳跃 window 的总和。聚合需要分别为每个键完成,我希望能够 运行 不同过程中的每个步骤。如果 stream-5 和 stream-10 包含相同 key/timestamp 的更新(所以我不一定需要 ),这本身不是问题,只要最后一个值是正确的。
有没有一种(简单的)方法可以使用高级 Kafka Streams DSL 来解决这个问题?到目前为止,我还没有找到一种优雅的方式来处理由于聚合而在 stream-5 上产生的中间更新。
我知道可以通过 cache.max.bytes.buffering
和 commit.interval.ms
设置以某种方式控制中间更新,但我认为任何设置都不能保证在所有情况下都不会产生中间值。我也可以尝试使用密钥的时间戳部分在读取时将 "stream-5" 转换为 KTable,但似乎 KTable 不支持像 KStreams 那样的 windowing 操作。
这是我目前所拥有的,但由于 stream-5 上的中间聚合值而失败了
Reducer<DataPoint> sum = new Reducer<DataPoint>() {
@Override
public DataPoint apply(DataPoint x, DataPoint y) {
return new DataPoint(x.timestamp, x.value + y.value);
}
};
KeyValueMapper<Windowed<String>, DataPoint, String> strip = new
KeyValueMapper<Windowed<String>, DataPoint, String>() {
@Override
public String apply(Windowed<String> wKey, DataPoint arg1) {
return wKey.key();
}
};
KStream<String, DataPoint> s1 = builder.stream("stream-1");
s1.groupByKey()
.reduce(sum, TimeWindows.of(5000).advanceBy(5000))
.toStream()
.selectKey(strip)
.to("stream-5");
KStream<String, DataPoint> s5 = builder.stream("stream-5");
s5.groupByKey()
.reduce(sum, TimeWindows.of(10000).advanceBy(10000))
.toStream()
.selectKey(strip)
.to("stream-10");
现在如果 stream-1 包含输入(键就是 KEY)
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}
stream-5 包含正确的(最终)值:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}
但是 stream-10 是错误的(最终值应该是 10.0),因为它还考虑了 stream-5 的中间值:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}
问题是所有聚合的结果都是 KTables,这意味着为它们的输出主题生成的记录代表一个变更日志。但是,当您随后将它们作为流加载时,下游聚合将重复计算。
相反,您需要将中间主题加载为表,而不是流。但是,您将无法在它们上使用窗口聚合,因为它们只能在流上使用。
您可以使用以下模式来完成对表而不是流的窗口聚合:
如果您想要 运行 单独流程中的每个步骤,您可以调整它,只需记住使用 builder.table() 而不是 builder.stream() 加载中间表。
假设我有一个流 "stream-1",每秒包含 1 个数据点,我想计算一个派生流 "stream-5",其中包含使用 5 的跳跃 window 的总和秒和另一个基于 "stream-5" 的流 "stream-10" 包含使用 10 秒的跳跃 window 的总和。聚合需要分别为每个键完成,我希望能够 运行 不同过程中的每个步骤。如果 stream-5 和 stream-10 包含相同 key/timestamp 的更新(所以我不一定需要
有没有一种(简单的)方法可以使用高级 Kafka Streams DSL 来解决这个问题?到目前为止,我还没有找到一种优雅的方式来处理由于聚合而在 stream-5 上产生的中间更新。
我知道可以通过 cache.max.bytes.buffering
和 commit.interval.ms
设置以某种方式控制中间更新,但我认为任何设置都不能保证在所有情况下都不会产生中间值。我也可以尝试使用密钥的时间戳部分在读取时将 "stream-5" 转换为 KTable,但似乎 KTable 不支持像 KStreams 那样的 windowing 操作。
这是我目前所拥有的,但由于 stream-5 上的中间聚合值而失败了
Reducer<DataPoint> sum = new Reducer<DataPoint>() {
@Override
public DataPoint apply(DataPoint x, DataPoint y) {
return new DataPoint(x.timestamp, x.value + y.value);
}
};
KeyValueMapper<Windowed<String>, DataPoint, String> strip = new
KeyValueMapper<Windowed<String>, DataPoint, String>() {
@Override
public String apply(Windowed<String> wKey, DataPoint arg1) {
return wKey.key();
}
};
KStream<String, DataPoint> s1 = builder.stream("stream-1");
s1.groupByKey()
.reduce(sum, TimeWindows.of(5000).advanceBy(5000))
.toStream()
.selectKey(strip)
.to("stream-5");
KStream<String, DataPoint> s5 = builder.stream("stream-5");
s5.groupByKey()
.reduce(sum, TimeWindows.of(10000).advanceBy(10000))
.toStream()
.selectKey(strip)
.to("stream-10");
现在如果 stream-1 包含输入(键就是 KEY)
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}
stream-5 包含正确的(最终)值:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}
但是 stream-10 是错误的(最终值应该是 10.0),因为它还考虑了 stream-5 的中间值:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}
问题是所有聚合的结果都是 KTables,这意味着为它们的输出主题生成的记录代表一个变更日志。但是,当您随后将它们作为流加载时,下游聚合将重复计算。
相反,您需要将中间主题加载为表,而不是流。但是,您将无法在它们上使用窗口聚合,因为它们只能在流上使用。
您可以使用以下模式来完成对表而不是流的窗口聚合:
如果您想要 运行 单独流程中的每个步骤,您可以调整它,只需记住使用 builder.table() 而不是 builder.stream() 加载中间表。