KafkaProducer flush 不会阻塞主线程

KafkaProducer flush does not block the main thread

我与 org.apache.kafka.clients 包中的 KafkaProducer class 有过一段有趣的经历。 根据文档,flush() 方法的执行应该阻止代码执行,直到所有消息都已发送。

我有这样的代码:

messageSender.flush();
messageSender.close();

方法messageSender.flush() 为我所有的生产者执行刷新:

public void flush() {
        producers.forEach(Producer::flush);
    }

在执行第一个代码块之前,我通过 send() 方法发送了一些消息。 但是结束后,我发现并不是所有消息都在生产者关闭之前发送。

如果我将第一个代码块更改为:

messageSender.flush();
try {
    Thread.sleep(500);
} catch (InterruptedException e) {
    e.printStackTrace();
}
messageSender.close();

所有消息已发送。

我做错了什么?

我的制作人有这样的配置:

(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
(ProducerConfig.ACKS_CONFIG, "0");
(ProducerConfig.LINGER_MS_CONFIG, "40");
(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

同花顺

您的主线程不会被 producer.flush() 阻塞太久, 那是因为 0 作为 ACK,生产者将更像一个 UDP 发送者:如果调用了 sent,则消息已成功发送完成,没有任何保证。因此,此配置没有真正的 block-until-received 机制,flush 清空的缓冲区几乎已经是空的。


关闭

另一个应该阻塞主线程直到发送消息的方法是 close() 方法。来自文档:

close()

This method blocks until all previously sent requests complete.


根据这个逻辑,你说代码应该等到所有消息都发送完才对,这是对的。您的配置中的问题似乎是 ACKS=0 属性.

ACKs

With a value of 0, the producer won’t even wait for a response from the broker. It immediately considers the write successful the moment the record is sent out.

可能发生的情况是:close() 将等待所有请求完成,但 ACK 为 0 时,它会谎称已发送成功消息。 producer 不会等待确认,因此会假设每条消息都已正确发送; close() 操作将在之后立即执行,对已完成的请求没有任何真正的保证。在正常情况下,这主要影响最后发送的请求。


我会尝试的是:

  • ACKs = 1至少

这将使 flush()close() 都稍等片刻,因为生产者需要来自代理的确认才能将请求标记为成功。

  • producer.close(500,TimeUnit.MILLISECONDS)

This close method 等待生产者完成所有未完成请求的发送超时。这也将有助于处理最后的消息,并且是执行您在睡眠中手动执行的操作的“官方”方式。将其设置为您希望的等待时间。

  • 重复使用相同的 producer

不仅在src comments中推荐,而且对一般性能也有帮助。

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.