KafkaProducer flush 不会阻塞主线程
KafkaProducer flush does not block the main thread
我与 org.apache.kafka.clients 包中的 KafkaProducer class 有过一段有趣的经历。
根据文档,flush() 方法的执行应该阻止代码执行,直到所有消息都已发送。
我有这样的代码:
messageSender.flush();
messageSender.close();
方法messageSender.flush() 为我所有的生产者执行刷新:
public void flush() {
producers.forEach(Producer::flush);
}
在执行第一个代码块之前,我通过 send() 方法发送了一些消息。
但是结束后,我发现并不是所有消息都在生产者关闭之前发送。
如果我将第一个代码块更改为:
messageSender.flush();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
messageSender.close();
所有消息已发送。
我做错了什么?
我的制作人有这样的配置:
(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
(ProducerConfig.ACKS_CONFIG, "0");
(ProducerConfig.LINGER_MS_CONFIG, "40");
(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
同花顺
您的主线程不会被 producer.flush()
阻塞太久,
那是因为 0
作为 ACK
,生产者将更像一个 UDP
发送者:如果调用了 sent,则消息已成功发送完成,没有任何保证。因此,此配置没有真正的 block-until-received 机制,flush
清空的缓冲区几乎已经是空的。
关闭
另一个应该阻塞主线程直到发送消息的方法是 close()
方法。来自文档:
close()
This method blocks until all previously sent requests complete.
根据这个逻辑,你说代码应该等到所有消息都发送完才对,这是对的。您的配置中的问题似乎是 ACKS=0
属性.
ACKs
With a value of 0, the producer won’t even wait for a response from
the broker. It immediately considers the write successful the moment
the record is sent out.
可能发生的情况是:close()
将等待所有请求完成,但 ACK 为 0 时,它会谎称已发送成功消息。 producer
不会等待确认,因此会假设每条消息都已正确发送; close()
操作将在之后立即执行,对已完成的请求没有任何真正的保证。在正常情况下,这主要影响最后发送的请求。
我会尝试的是:
ACKs = 1
(至少)
这将使 flush()
和 close()
都稍等片刻,因为生产者需要来自代理的确认才能将请求标记为成功。
producer.close(500,TimeUnit.MILLISECONDS)
This close method 等待生产者完成所有未完成请求的发送超时。这也将有助于处理最后的消息,并且是执行您在睡眠中手动执行的操作的“官方”方式。将其设置为您希望的等待时间。
- 重复使用相同的
producer
不仅在src comments中推荐,而且对一般性能也有帮助。
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
我与 org.apache.kafka.clients 包中的 KafkaProducer class 有过一段有趣的经历。 根据文档,flush() 方法的执行应该阻止代码执行,直到所有消息都已发送。
我有这样的代码:
messageSender.flush();
messageSender.close();
方法messageSender.flush() 为我所有的生产者执行刷新:
public void flush() {
producers.forEach(Producer::flush);
}
在执行第一个代码块之前,我通过 send() 方法发送了一些消息。 但是结束后,我发现并不是所有消息都在生产者关闭之前发送。
如果我将第一个代码块更改为:
messageSender.flush();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
messageSender.close();
所有消息已发送。
我做错了什么?
我的制作人有这样的配置:
(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
(ProducerConfig.ACKS_CONFIG, "0");
(ProducerConfig.LINGER_MS_CONFIG, "40");
(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
同花顺
您的主线程不会被 producer.flush()
阻塞太久,
那是因为 0
作为 ACK
,生产者将更像一个 UDP
发送者:如果调用了 sent,则消息已成功发送完成,没有任何保证。因此,此配置没有真正的 block-until-received 机制,flush
清空的缓冲区几乎已经是空的。
关闭
另一个应该阻塞主线程直到发送消息的方法是 close()
方法。来自文档:
close()
This method blocks until all previously sent requests complete.
根据这个逻辑,你说代码应该等到所有消息都发送完才对,这是对的。您的配置中的问题似乎是 ACKS=0
属性.
ACKs
With a value of 0, the producer won’t even wait for a response from the broker. It immediately considers the write successful the moment the record is sent out.
可能发生的情况是:close()
将等待所有请求完成,但 ACK 为 0 时,它会谎称已发送成功消息。 producer
不会等待确认,因此会假设每条消息都已正确发送; close()
操作将在之后立即执行,对已完成的请求没有任何真正的保证。在正常情况下,这主要影响最后发送的请求。
我会尝试的是:
ACKs = 1
(至少)
这将使 flush()
和 close()
都稍等片刻,因为生产者需要来自代理的确认才能将请求标记为成功。
producer.close(500,TimeUnit.MILLISECONDS)
This close method 等待生产者完成所有未完成请求的发送超时。这也将有助于处理最后的消息,并且是执行您在睡眠中手动执行的操作的“官方”方式。将其设置为您希望的等待时间。
- 重复使用相同的
producer
不仅在src comments中推荐,而且对一般性能也有帮助。
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.