Kafka Streams 仅提交 KGroupedTable 的最新消息
Kafka Streams committing just the latest message of KGroupedTable
我有如下 Kafka Streams 应用程序:
static KafkaStreams build(AppConfig appConfig, SerdesHelper serdes) {
final KStreamBuilder builder = new KStreamBuilder();
builder
.table(serdes.sourceKeySerde, serdes.sourceValueSerde, appConfig.sourceTopic)
.groupBy(StreamBuilder::groupByMapper, serdes.intSerde, serdes.longSerde)
.aggregate(
StreamBuilder::initialize,
StreamBuilder::add,
StreamBuilder::subtract,
serdes.sinkValueSerde)
.to(serdes.intSerde, serdes.sinkValueSerde, appConfig.sinkTopic);
return new KafkaStreams(builder, appConfig.streamConfig);
}
我的具体例子分组记录如下
((k, v)) -> ((k), v[])
虽然 运行 这包含 3.000.000 条消息的虚拟数据,只有两个唯一键,但我最终在 sinkTopic
不到一分钟内收到了大约 10.000 条消息,我希望得到要么 4/6(基于我设法停止应用程序的那一刻)。
如何确保只有具有最新分组值的键才会被提交回 Kafka,而不是每个中间消息?
是流处理,不是批处理。没有 "latest grouped value" -- 输入是无限的,因此输出是无限的...
你只能减少中间体的数量
- 增加 KTable 缓存大小(但这对你的情况来说似乎不是问题,因为你只有 2 个唯一键,因此如果你没有禁用缓存或
- 增加提交间隔
我有如下 Kafka Streams 应用程序:
static KafkaStreams build(AppConfig appConfig, SerdesHelper serdes) {
final KStreamBuilder builder = new KStreamBuilder();
builder
.table(serdes.sourceKeySerde, serdes.sourceValueSerde, appConfig.sourceTopic)
.groupBy(StreamBuilder::groupByMapper, serdes.intSerde, serdes.longSerde)
.aggregate(
StreamBuilder::initialize,
StreamBuilder::add,
StreamBuilder::subtract,
serdes.sinkValueSerde)
.to(serdes.intSerde, serdes.sinkValueSerde, appConfig.sinkTopic);
return new KafkaStreams(builder, appConfig.streamConfig);
}
我的具体例子分组记录如下
((k, v)) -> ((k), v[])
虽然 运行 这包含 3.000.000 条消息的虚拟数据,只有两个唯一键,但我最终在 sinkTopic
不到一分钟内收到了大约 10.000 条消息,我希望得到要么 4/6(基于我设法停止应用程序的那一刻)。
如何确保只有具有最新分组值的键才会被提交回 Kafka,而不是每个中间消息?
是流处理,不是批处理。没有 "latest grouped value" -- 输入是无限的,因此输出是无限的...
你只能减少中间体的数量
- 增加 KTable 缓存大小(但这对你的情况来说似乎不是问题,因为你只有 2 个唯一键,因此如果你没有禁用缓存或
- 增加提交间隔