生产者中的 Kafka 数据丢失
Kafka data loss, in producer
我一直在尝试配置一个Kafka broker,一个topic,一个producer,一个consumer。
当生产者生产时,如果 Broker 宕机,就会发生数据丢失,
例如:
In Buffer:
Datum 1 - published
Datum 2 - published
.
. ---->(Broker goes down for a while and reconnects...)
.
Datum 4 - published
Datum 5 - published
为生产者配置的属性是:
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=2
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.type=sync
buffer.size=102400
reconnect.interval=30000
request.required.acks=1
数据大小小于配置的缓冲区大小..
帮我知道哪里错了...!
不确定你到底在做什么。我假设您在代理关闭时尝试写入 Kafka 的消息不会被 Kafka 确认。如果一条消息没有acked,说明这条消息没有写入到kafka,producer需要重新尝试写入这条消息。
最简单的方法是相应地设置配置参数 retries
和 retry.backoff.ms
。
在应用程序级别,您还可以在 send(..., Callback)
中注册一个 Callback
以获取有关 success/failure 的通知。如果失败,您可以再次调用 send()
重试发送。
我一直在尝试配置一个Kafka broker,一个topic,一个producer,一个consumer。 当生产者生产时,如果 Broker 宕机,就会发生数据丢失, 例如:
In Buffer:
Datum 1 - published
Datum 2 - published
.
. ---->(Broker goes down for a while and reconnects...)
.
Datum 4 - published
Datum 5 - published
为生产者配置的属性是:
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=2
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.type=sync
buffer.size=102400
reconnect.interval=30000
request.required.acks=1
数据大小小于配置的缓冲区大小.. 帮我知道哪里错了...!
不确定你到底在做什么。我假设您在代理关闭时尝试写入 Kafka 的消息不会被 Kafka 确认。如果一条消息没有acked,说明这条消息没有写入到kafka,producer需要重新尝试写入这条消息。
最简单的方法是相应地设置配置参数 retries
和 retry.backoff.ms
。
在应用程序级别,您还可以在 send(..., Callback)
中注册一个 Callback
以获取有关 success/failure 的通知。如果失败,您可以再次调用 send()
重试发送。