Kafka生产者配置立即发送消息
Kafka producer config to send a message immediately
我正在使用 Kafka 生产者 0.8.2,我正在尝试以立即发送消息的方式向主题发送一条消息。我有一个控制台消费者来观察消息是否到达。我注意到消息不会立即发送,除非我 运行 producer.close() 在发送后立即发送,这不是我想做的。
针对此问题的正确生产者配置设置是什么?我正在使用以下内容(我知道它看起来像是一堆不同的 configurations/versions,但我根本找不到像我在文档中期望的那样工作的东西):
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersStr);
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put("producer.type", "sync");
props.put("batch.num.messages", "1");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
我找到了一个看起来合理的解决方案,它涉及对 Producer 的 send() 命令返回的 Future 的 运行 get()。我更改了发送命令:
producer.send(record);
以下内容:
producer.send(record).get();
如果这种方法有任何问题,很高兴听到更有经验的 Kafka 用户的意见?另外,我有兴趣了解生产者是否有一个配置设置来实现同样的事情(即,立即发送一条消息而无需 运行 get() 的未来)。
旧post,但我已经很努力地错过了这里的post。
我在尝试 运行 Kafka 示例时偶然发现了相同的行为,而这个 .get()
是唯一将消息发送到 Kafka 的东西。 The Javadoc for KafkaProducer.send(…)
声明此方法是异步的。在我的测试代码中,消息因此被发送到 Kafka,而我的代码继续 运行 并且实际上刚刚到达 运行 的末尾并在消息实际发送到 Future
之前终止].
所以这个 .get()
只是阻塞在 Future
上,直到它被实现。这实际上消除了 Future
的好处。一种更简洁的方法是在 .send(…)
之后使用 Thread.sleep(…)
稍等片刻(取决于您的用例)。
我正在使用 Kafka 生产者 0.8.2,我正在尝试以立即发送消息的方式向主题发送一条消息。我有一个控制台消费者来观察消息是否到达。我注意到消息不会立即发送,除非我 运行 producer.close() 在发送后立即发送,这不是我想做的。
针对此问题的正确生产者配置设置是什么?我正在使用以下内容(我知道它看起来像是一堆不同的 configurations/versions,但我根本找不到像我在文档中期望的那样工作的东西):
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersStr);
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put("producer.type", "sync");
props.put("batch.num.messages", "1");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
我找到了一个看起来合理的解决方案,它涉及对 Producer 的 send() 命令返回的 Future 的 运行 get()。我更改了发送命令:
producer.send(record);
以下内容:
producer.send(record).get();
如果这种方法有任何问题,很高兴听到更有经验的 Kafka 用户的意见?另外,我有兴趣了解生产者是否有一个配置设置来实现同样的事情(即,立即发送一条消息而无需 运行 get() 的未来)。
旧post,但我已经很努力地错过了这里的post。
我在尝试 运行 Kafka 示例时偶然发现了相同的行为,而这个 .get()
是唯一将消息发送到 Kafka 的东西。 The Javadoc for KafkaProducer.send(…)
声明此方法是异步的。在我的测试代码中,消息因此被发送到 Kafka,而我的代码继续 运行 并且实际上刚刚到达 运行 的末尾并在消息实际发送到 Future
之前终止].
所以这个 .get()
只是阻塞在 Future
上,直到它被实现。这实际上消除了 Future
的好处。一种更简洁的方法是在 .send(…)
之后使用 Thread.sleep(…)
稍等片刻(取决于您的用例)。