Kafka生产者配置立即发送消息

Kafka producer config to send a message immediately

我正在使用 Kafka 生产者 0.8.2,我正在尝试以立即发送消息的方式向主题发送一条消息。我有一个控制台消费者来观察消息是否到达。我注意到消息不会立即发送,除非我 运行 producer.close() 在发送后立即发送,这不是我想做的。

针对此问题的正确生产者配置设置是什么?我正在使用以下内容(我知道它看起来像是一堆不同的 configurations/versions,但我根本找不到像我在文档中期望的那样工作的东西):

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersStr);
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put("producer.type", "sync");
props.put("batch.num.messages", "1");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

我找到了一个看起来合理的解决方案,它涉及对 Producer 的 send() 命令返回的 Future 的 运行 get()。我更改了发送命令:

producer.send(record);

以下内容:

producer.send(record).get();

如果这种方法有任何问题,很高兴听到更有经验的 Kafka 用户的意见?另外,我有兴趣了解生产者是否有一个配置设置来实现同样的事情(即,立即发送一条消息而无需 运行 get() 的未来)。

旧post,但我已经很努力地错过了这里的post。

我在尝试 运行 Kafka 示例时偶然发现了相同的行为,而这个 .get() 是唯一将消息发送到 Kafka 的东西。 The Javadoc for KafkaProducer.send(…) 声明此方法是异步的。在我的测试代码中,消息因此被发送到 Kafka,而我的代码继续 运行 并且实际上刚刚到达 运行 的末尾并在消息实际发送到 Future 之前终止].

所以这个 .get() 只是阻塞在 Future 上,直到它被实现。这实际上消除了 Future 的好处。一种更简洁的方法是在 .send(…) 之后使用 Thread.sleep(…) 稍等片刻(取决于您的用例)。