防止Confluent Kafka在生产时丢失消息
Prevent Confluent Kafka from losing messages when producing
Confluent Kafka 库(在本例中为 python 版本)有一个采用传递回调函数的生产方法:
kafka_producer.produce(topic=topic,
key=key,
value=value,
on_delivery=delivery_callback)
无论是否成功发送消息都会调用此回调:
def delivery_callback(err, msg):
如果消息失败,我在此函数中没有任何重试逻辑,因为文档说它是异步的。
相反,每 100 条消息左右,我依靠 flush()
来告诉我是否有任何消息未成功生成:
messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
//continue to the next batch of 100
else:
//produce the batch again
flush()
是否会解释未能生成的任何消息? (在 delivery_callback
中报告为错误)
换句话说,如果任何消息失败,我能确定flush()
不会return零吗?
确认了以下结果:
调用.flush()
绝对可以return零,即使消息未能生成。此方法似乎要等到所有消息的所有传递回调都完成(回调可以简单地报告消息传递失败)。
从我们的角度来看,整件事出奇地尴尬。如果您不能承受丢失消息的后果,则需要检测传递回调何时失败,并实施某种形式的重试逻辑来覆盖失败的消息。
Confluent Kafka 库(在本例中为 python 版本)有一个采用传递回调函数的生产方法:
kafka_producer.produce(topic=topic,
key=key,
value=value,
on_delivery=delivery_callback)
无论是否成功发送消息都会调用此回调:
def delivery_callback(err, msg):
如果消息失败,我在此函数中没有任何重试逻辑,因为文档说它是异步的。
相反,每 100 条消息左右,我依靠 flush()
来告诉我是否有任何消息未成功生成:
messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
//continue to the next batch of 100
else:
//produce the batch again
flush()
是否会解释未能生成的任何消息? (在 delivery_callback
中报告为错误)
换句话说,如果任何消息失败,我能确定flush()
不会return零吗?
确认了以下结果:
调用.flush()
绝对可以return零,即使消息未能生成。此方法似乎要等到所有消息的所有传递回调都完成(回调可以简单地报告消息传递失败)。
从我们的角度来看,整件事出奇地尴尬。如果您不能承受丢失消息的后果,则需要检测传递回调何时失败,并实施某种形式的重试逻辑来覆盖失败的消息。