Java Apache Kafka 生产者元数据更新器和重试逻辑

Java Apache Kafka Producer Metadata Updater & Retry Logic

我正在为 Apache Kafka 使用 Spring,并创建了一个服务,该服务通过 Spring 的 KafkaTemplate 使用 Kafka Producer (org.apache.kafka.clients.producer) 向主题发送消息。在目标 Kafka 集群上,我禁用了自动创建主题。使用此处列出的生产者配置的组合 https://kafka.apache.org/documentation/#producerconfigs 我成功地控制了重试请求的次数、重试之间的时间等。

如果我提供了一个不存在的主题,请求会在我期望的时候超时(达到 max.block.ms 的值)。但是,超时后,我继续以 retry.backoff.ms 设置的时间间隔获取日志条目(例如下面的条目),直到 300000 毫秒/5 分钟 已达到。

我无法确定可以更改生产者或代理上的哪个配置 属性 以阻止生产者检查主题是否已创建 5 分钟。

谁能告诉我正确的设置,让我可以减少这个,或者让它在请求超时后停止检查?

日志条目示例:

WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater: [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {<specified_topic>=UNKNOWN_TOPIC_OR_PARTITION}

使用的生产者配置:

Kafka Producer 在第一个 send 之前检索并缓存 topic/partition 元数据。然后它会定期尝试刷新此元数据,"good" 每 metadata.max.age.ms(默认=5 分钟)一次,"invalid" 个主题每 retry.backoff.ms 一次。这些元数据刷新尝试是您在日志中观察到的。

为了防止缓存不受控制地增长,根据这些 source comments. Currently, this expiry period is hardcoded in ProducerMetadata.java 5 分钟,未使用的主题会在一定时间后从缓存中删除。

  public class ProducerMetadata extends Metadata {
      private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
      static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
        ...

您实际上可以通过将生产者日志级别设置为 DEBUG 来观察所有这些 activity。