Kafka producer flush 和 poll 的区别

Kafka producer difference between flush and poll

我们有一个 Kafka 消费者,它将读取消息并执行相关操作,然后使用以下脚本再次发布到 Kafka 主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}

我没有配置任何其他配置 queue.buffering.max.messages queue.buffering.max.ms batch.num.messages

我假设这些都将成为 configuration

的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000

我的理解:当内部队列到达 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以每条消息都会在我调用 produce() 时立即发布。如果我错了,请纠正我。

我的制作人片段:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()

来自 this post 我了解到在每条消息之后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,它需要大约 45 毫秒才能发布到 Kafka

如果我将上面的片段更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)

有什么性能会提高吗?你能解释一下我的理解吗?

谢谢

flush()poll()的区别在客户的documentation中有说明。

对于 flush(),它表示:

Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len() is zero or the optional timeout elapses.

对于poll()

Polls the producer for events and calls the corresponding callbacks (if registered).

send() 之后立即调用 poll() 不会使生产者同步,因为刚刚发送的消息不太可能已经到达代理并且交付报告已经发送回客户端。

相反,flush() 将阻塞,直到之前发送的消息已交付(或出错),有效地使生产者同步。