kafka broker 重启后,out queue 中的所有消息都无法传递给 broker
All messages in out queue failed to be delivered to broker when kafka brokers restarted
我正在开发 C++ Kafka 客户端:librdkafka。库在这里 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp。我的程序正在向代理写入 2000000 条消息。在此过程中,我重新启动了代理。有时,没有消息无法传递给代理。有时大约有 100,000 条消息无法传递给代理。 queue.buffering.max.messages=100000。 似乎out队列中的所有消息都丢失了?
错误是RdKafka::Message delivery report: Local: Unknown partition。
我发现了新的问题:(1) 有时,大约200条消息会被两次发送给broker。(2) 有时,一条消息已经发送给broker,但是dr_cb( ) 叫做。它告诉我这个消息未能传递给broker。我想弄清楚是broker的问题还是client的问题。有人有类似的问题吗?事实上,我需要客户端和代理服务器之间可靠的传输和交付报告。我现在正在考虑使用 C 客户端。不确定这个问题是否会再次发生...
经纪人的日志是:
[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
at kafka.server.KafkaApis$$anonfun.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:194)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:194)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.immutable.Set$Set1.map(Set.scala:73)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)
[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
我的生产者配置是:
Global config
client.id=rdkafka
metadata.broker.list=localhost:9092
message.max.bytes=4000000
receive.message.max.bytes=100000000
metadata.request.timeout.ms=900000
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.sparse=false
socket.timeout.ms=300000
socket.send.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.keepalive.enable=false
socket.max.fails=10
broker.address.ttl=300000
broker.address.family=any
statistics.interval.ms=0
error_cb=0x5288a60
stats_cb=0x5288ba0
log_cb=0x54942a0
log_level=6
socket_cb=0x549e6c0
open_cb=0x54acf90
opaque=0x9167898
internal.termination.signal=0
queued.min.messages=100000
queued.max.messages.kbytes=1000000
fetch.wait.max.ms=100
fetch.message.max.bytes=1048576
fetch.min.bytes=1
fetch.error.backoff.ms=500
queue.buffering.max.messages=100000
queue.buffering.max.ms=1000
message.send.max.retries=10
retry.backoff.ms=100
compression.codec=none
batch.num.messages=1000
delivery.report.only.error=true
Topic config
request.required.acks=1
enforce.isr.cnt=0
request.timeout.ms=5000
message.timeout.ms=300000
produce.offset.report=false
auto.commit.enable=true
auto.commit.interval.ms=60000
auto.offset.reset=largest
offset.store.path=.
offset.store.sync.interval.ms=-1
offset.store.method=file
consume.callback.max.messages=0
消费者输出为:
[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
欢迎提出任何建议。谢谢
在异步模式下,客户端应该处理这种问题。不知道如何确保出队列中的消息可以 100% 的概率传递给代理。我们能做的就是确定出队列中的消息。如果投递失败,我们应该将消息重新放入队列中。如果发送失败,则调用 dr_cb()。在这个函数中,再次尝试将消息放入出队列。也许这不是最好的方法。但是现在,我是这样用的
我正在开发 C++ Kafka 客户端:librdkafka。库在这里 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp。我的程序正在向代理写入 2000000 条消息。在此过程中,我重新启动了代理。有时,没有消息无法传递给代理。有时大约有 100,000 条消息无法传递给代理。 queue.buffering.max.messages=100000。 似乎out队列中的所有消息都丢失了? 错误是RdKafka::Message delivery report: Local: Unknown partition。
我发现了新的问题:(1) 有时,大约200条消息会被两次发送给broker。(2) 有时,一条消息已经发送给broker,但是dr_cb( ) 叫做。它告诉我这个消息未能传递给broker。我想弄清楚是broker的问题还是client的问题。有人有类似的问题吗?事实上,我需要客户端和代理服务器之间可靠的传输和交付报告。我现在正在考虑使用 C 客户端。不确定这个问题是否会再次发生...
经纪人的日志是:
[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)
[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
我的生产者配置是:
Global config
client.id=rdkafka
metadata.broker.list=localhost:9092
message.max.bytes=4000000
receive.message.max.bytes=100000000
metadata.request.timeout.ms=900000
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.sparse=false
socket.timeout.ms=300000
socket.send.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.keepalive.enable=false
socket.max.fails=10
broker.address.ttl=300000
broker.address.family=any
statistics.interval.ms=0
error_cb=0x5288a60
stats_cb=0x5288ba0
log_cb=0x54942a0
log_level=6
socket_cb=0x549e6c0
open_cb=0x54acf90
opaque=0x9167898
internal.termination.signal=0
queued.min.messages=100000
queued.max.messages.kbytes=1000000
fetch.wait.max.ms=100
fetch.message.max.bytes=1048576
fetch.min.bytes=1
fetch.error.backoff.ms=500
queue.buffering.max.messages=100000
queue.buffering.max.ms=1000
message.send.max.retries=10
retry.backoff.ms=100
compression.codec=none
batch.num.messages=1000
delivery.report.only.error=true
Topic config
request.required.acks=1
enforce.isr.cnt=0
request.timeout.ms=5000
message.timeout.ms=300000
produce.offset.report=false
auto.commit.enable=true
auto.commit.interval.ms=60000
auto.offset.reset=largest
offset.store.path=.
offset.store.sync.interval.ms=-1
offset.store.method=file
consume.callback.max.messages=0
消费者输出为:
[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
欢迎提出任何建议。谢谢
在异步模式下,客户端应该处理这种问题。不知道如何确保出队列中的消息可以 100% 的概率传递给代理。我们能做的就是确定出队列中的消息。如果投递失败,我们应该将消息重新放入队列中。如果发送失败,则调用 dr_cb()。在这个函数中,再次尝试将消息放入出队列。也许这不是最好的方法。但是现在,我是这样用的