kafka new producer 在其中一个代理关闭后无法更新元数据

kafka new producer is not able to update metadata after one of the broker is down

我有一个 kafka 环境,其中有 2 个代理和 1 个动物园管理员。

当我尝试向 kafka 生成消息时,如果我停止代理 1(领导者),客户端将停止生成消息并给我以下错误,尽管代理 2 被选为新的领导者主题和部分。

org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后无法更新元数据。

10 分钟过去后,由于经纪人 2 是新的领导者,我希望生产者将数据发送给经纪人 2,但它继续失败,给出上述异常。 lastRefreshMs 和 lastSuccessfullRefreshMs 仍然相同,尽管生产者的 metadataExpireMs 是 300000。

我在生产者端使用 kafka new Producer 实现。

似乎当生产者启动时,它会绑定到一个代理,如果该代理出现故障,它甚至不会尝试连接到集群中的另一个代理。

但我的期望是,如果一个代理出现故障,它应该直接检查元数据以寻找另一个可用的代理并向它们发送数据。

顺便说一句,我的主题是 4 个分区,复制因子为 2。提供此信息以防万一。

配置参数。

{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}

用例:

1- 开始 BR1 和 BR2 生产数据(Leader 是 BR1)

2- 停止 BR2 产生数据(正常)

3- 停止 BR1(这意味着此时集群中没有活动的工作代理)然后启动 BR2 并生成数据(尽管领导者是 BR2 但失败)

4-开始BR1生产数据(leader还是BR2但是数据生产的很好)

5- 停止 BR2(现在 BR1 是领导者)

6- 停止 BR1(BR1 仍然是领导者)

7- 开始 BR1 生成数据(消息再次生成正常)

如果生产者将最新的成功数据发送给 BR1,然后所有代理都宕机了,生产者希望 BR1 再次启动,尽管 BR2 已经启动并成为新的领导者。这是预期的行为吗?

花了几个小时后,我弄清楚了卡夫卡在我的情况下的行为。可能这是一个错误,也可能是因为隐藏在引擎盖下的原因而需要以这种方式完成,但实际上,如果我要这样做,我不会这样做:)

当所有代理都宕机时,如果您只能启动一个代理,则这必须是最后一个宕机才能成功生成消息的代理。

假设您有 5 个经纪人; BR1、BR2、BR3、BR4 和 BR5。如果一切都崩溃了,如果最后死掉的经纪人是 BR3(它是最后一个领导者),尽管你启动了所有经纪人 BR1、BR2、BR4 和 BR5,但除非你启动 BR3,否则它没有任何意义。

您需要增加重试次数。 在您的情况下,您需要将其设置为 >=5.

这是您的生产者知道您的集群有新领导者的唯一方式。

除此之外,请确保您的所有经纪人都有您的分区副本。否则你不会得到一个新的领导者。

在最新的 kafka 版本中,当一个 broker 宕机时,它有一个生产者使用的 leader 分区。生产者将重试直到捕获可重试异常,然后生产者需要更新元数据。新的元数据可以从 leastLoadNode 获取。所以新的领导者将被更新,生产者可以在那里写。