Kafka Streams 仅提交 KGroupedTable 的最新消息

Kafka Streams committing just the latest message of KGroupedTable

我有如下 Kafka Streams 应用程序:

static KafkaStreams build(AppConfig appConfig, SerdesHelper serdes) {
  final KStreamBuilder builder = new KStreamBuilder();

  builder
      .table(serdes.sourceKeySerde, serdes.sourceValueSerde, appConfig.sourceTopic)
      .groupBy(StreamBuilder::groupByMapper, serdes.intSerde, serdes.longSerde)
      .aggregate(
          StreamBuilder::initialize,
          StreamBuilder::add,
          StreamBuilder::subtract,
          serdes.sinkValueSerde)
      .to(serdes.intSerde, serdes.sinkValueSerde, appConfig.sinkTopic);

  return new KafkaStreams(builder, appConfig.streamConfig);
}

我的具体例子分组记录如下

((k, v)) -> ((k), v[])

虽然 运行 这包含 3.000.000 条消息的虚拟数据,只有两个唯一键,但我最终在 sinkTopic 不到一分钟内收到了大约 10.000 条消息,我希望得到要么 4/6(基于我设法停止应用程序的那一刻)。

如何确保只有具有最新分组值的键才会被提交回 Kafka,而不是每个中间消息?

是流处理,不是批处理。没有 "latest grouped value" -- 输入是无限的,因此输出是无限的...

你只能减少中间体的数量

  1. 增加 KTable 缓存大小(但这对你的情况来说似乎不是问题,因为你只有 2 个唯一键,因此如果你没有禁用缓存或
  2. 增加提交间隔