更新版本的 Kafka 生产者是否还有 "producer.type"?

Do newer versions of Kafka producers still have "producer.type"?

旧版本的文档说这是基本属性之一。

较新版本的文档根本没有提及。

更新版本的 Kafka 生产者是否还有 producer.type

或者,新的制作人总是async,我应该打电话给future.get()才能做到sync?

为什么要让 send() 同步?

这是一个 kafka 功能,用于批处理消息以获得更好的吞吐量。

Asynchronous send

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

由于 api 仅支持异步方法,因此无法进行发送同步,但是您可以指定一些配置来完成一些工作。

您可以将 batch.size 设置为 0。在这种情况下,消息 bacting 被禁用。

但是我认为您应该保留 batch.size 默认值并将 linger.ms 设置为 0 (这也是默认的)。在这种情况下,如果同时有很多消息进来,他们会立即批量发送。

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out.

如果你想确保消息被成功发送和保存,你可以将 acks 设置为 -1 或 1 和 retries 到 3(例如)

更多生产者配置,可以参考https://kafka.apache.org/documentation/#producerconfigs

新生产者总是异步的,您应该调用 future.get() 使其同步。当像添加 future.get() 这样简单的东西为您提供基本相同的功能时,不值得制作两个 api 方法。

来自此处 send() 的文档

https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.

If you want to simulate a simple blocking call you can call the get() method immediately:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();  
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value);
producer.send(record).get();