Java Apache Kafka 生产者元数据更新器和重试逻辑
Java Apache Kafka Producer Metadata Updater & Retry Logic
我正在为 Apache Kafka 使用 Spring,并创建了一个服务,该服务通过 Spring 的 KafkaTemplate 使用 Kafka Producer (org.apache.kafka.clients.producer) 向主题发送消息。在目标 Kafka 集群上,我禁用了自动创建主题。使用此处列出的生产者配置的组合 https://kafka.apache.org/documentation/#producerconfigs 我成功地控制了重试请求的次数、重试之间的时间等。
如果我提供了一个不存在的主题,请求会在我期望的时候超时(达到 max.block.ms 的值)。但是,超时后,我继续以 retry.backoff.ms 设置的时间间隔获取日志条目(例如下面的条目),直到 300000 毫秒/5 分钟 已达到。
我无法确定可以更改生产者或代理上的哪个配置 属性 以阻止生产者检查主题是否已创建 5 分钟。
谁能告诉我正确的设置,让我可以减少这个,或者让它在请求超时后停止检查?
日志条目示例:
WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater: [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {<specified_topic>=UNKNOWN_TOPIC_OR_PARTITION}
使用的生产者配置:
- delivery.timeout.ms = 5000
- linger.ms = 1000
- max.block.ms = 8000
- request.timeout.ms= 4000
- max.retry.count = 0
- retry.backoff.ms = 2000
Kafka Producer 在第一个 send
之前检索并缓存 topic/partition 元数据。然后它会定期尝试刷新此元数据,"good" 每 metadata.max.age.ms
(默认=5 分钟)一次,"invalid" 个主题每 retry.backoff.ms
一次。这些元数据刷新尝试是您在日志中观察到的。
为了防止缓存不受控制地增长,根据这些 source comments. Currently, this expiry period is hardcoded in ProducerMetadata.java
5 分钟,未使用的主题会在一定时间后从缓存中删除。
public class ProducerMetadata extends Metadata {
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
...
您实际上可以通过将生产者日志级别设置为 DEBUG
来观察所有这些 activity。
我正在为 Apache Kafka 使用 Spring,并创建了一个服务,该服务通过 Spring 的 KafkaTemplate 使用 Kafka Producer (org.apache.kafka.clients.producer) 向主题发送消息。在目标 Kafka 集群上,我禁用了自动创建主题。使用此处列出的生产者配置的组合 https://kafka.apache.org/documentation/#producerconfigs 我成功地控制了重试请求的次数、重试之间的时间等。
如果我提供了一个不存在的主题,请求会在我期望的时候超时(达到 max.block.ms 的值)。但是,超时后,我继续以 retry.backoff.ms 设置的时间间隔获取日志条目(例如下面的条目),直到 300000 毫秒/5 分钟 已达到。
我无法确定可以更改生产者或代理上的哪个配置 属性 以阻止生产者检查主题是否已创建 5 分钟。
谁能告诉我正确的设置,让我可以减少这个,或者让它在请求超时后停止检查?
日志条目示例:
WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater: [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {<specified_topic>=UNKNOWN_TOPIC_OR_PARTITION}
使用的生产者配置:
- delivery.timeout.ms = 5000
- linger.ms = 1000
- max.block.ms = 8000
- request.timeout.ms= 4000
- max.retry.count = 0
- retry.backoff.ms = 2000
Kafka Producer 在第一个 send
之前检索并缓存 topic/partition 元数据。然后它会定期尝试刷新此元数据,"good" 每 metadata.max.age.ms
(默认=5 分钟)一次,"invalid" 个主题每 retry.backoff.ms
一次。这些元数据刷新尝试是您在日志中观察到的。
为了防止缓存不受控制地增长,根据这些 source comments. Currently, this expiry period is hardcoded in ProducerMetadata.java
5 分钟,未使用的主题会在一定时间后从缓存中删除。
public class ProducerMetadata extends Metadata {
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
...
您实际上可以通过将生产者日志级别设置为 DEBUG
来观察所有这些 activity。