Sprint Cloud Stream Kafka Streams Binder 处理器应用卡住
Sprint Cloud Stream Kafka Streams Binder processor application stuck
我有以下 Spring Cloud Stream Kafka Streams Binder 3.x 应用程序:
当我 运行 X 消息通过此应用程序发布到 topic1
从集成测试使用 @SpringBootTest
和 @EmbeddedKafka
点 1 的消息计数和 2 相等,如我所料。
当我使用连接到 Kafka 代理的实时应用程序执行相同操作时,第 1 点和第 2 点的计数仍然存在显着差异:Count1 >> Count2
。
Kafka 工具显示 Processor2
消费者在 topic2
上有很大的滞后,并且该滞后保持不变(在我停止发布消息后不会改变)
Processor2
包括
- flatTransform 状态转换器
- 聚合器
- 其他下游步骤
在测试和实时模式下出现明显的行为以及在实时模式下延迟没有下降的原因可能是什么?
我已经彻底比较了所有应用程序 属性 在测试和实时应用程序中激活的值,它们完全相同。
在这两种情况下,所有主题都只有 1 个分区。
在我的例子中,原因是由 Spring Cloud Stream 应用程序自动创建的主题的默认 7 天保留设置。
我的输入流中的消息跨越 8 年,我使用的是自定义 TimestampExtractor。
我手动将主题配置为较长的保留时间后,问题解决了:
/usr/bin/kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic2 --add-config retention.hours=87600
或者为整个 Kafka broker 设置 log.retention.hours
。
我有以下 Spring Cloud Stream Kafka Streams Binder 3.x 应用程序:
当我 运行 X 消息通过此应用程序发布到 topic1
从集成测试使用 @SpringBootTest
和 @EmbeddedKafka
点 1 的消息计数和 2 相等,如我所料。
当我使用连接到 Kafka 代理的实时应用程序执行相同操作时,第 1 点和第 2 点的计数仍然存在显着差异:Count1 >> Count2
。
Kafka 工具显示 Processor2
消费者在 topic2
上有很大的滞后,并且该滞后保持不变(在我停止发布消息后不会改变)
Processor2
包括
- flatTransform 状态转换器
- 聚合器
- 其他下游步骤
在测试和实时模式下出现明显的行为以及在实时模式下延迟没有下降的原因可能是什么?
我已经彻底比较了所有应用程序 属性 在测试和实时应用程序中激活的值,它们完全相同。
在这两种情况下,所有主题都只有 1 个分区。
在我的例子中,原因是由 Spring Cloud Stream 应用程序自动创建的主题的默认 7 天保留设置。
我的输入流中的消息跨越 8 年,我使用的是自定义 TimestampExtractor。
我手动将主题配置为较长的保留时间后,问题解决了:
/usr/bin/kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic2 --add-config retention.hours=87600
或者为整个 Kafka broker 设置 log.retention.hours
。