如何有效地从集合中生成消息到 Kafka
How to efficiently produce messages out of a collection to Kafka
在我的 Scala (2.11) 流应用程序中,我正在使用 IBM MQ 中一个队列的数据并将其写入具有一个分区的 Kafka 主题。使用来自 MQ 的数据后,消息负载被拆分为 3000 条较小的消息,这些消息存储在字符串序列中。然后这 3000 条消息中的每一条都使用 KafkaProducer 发送到 Kafka(版本 2.x)。
你会如何发送这 3000 条消息?
我无法增加 IBM MQ 中的队列数量(不受我的控制),也无法增加主题中的分区数量(需要对消息进行排序,编写自定义分区程序会影响太多消费者)主题)。
Producer 设置当前为:
- 确认=1
- linger.ms=0
- batch.size=65536
但优化它们可能是它自己的问题,而不是我当前问题的一部分。
目前,我正在做
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
val recordMetadata = future.get()
}
对我来说,这似乎不是最优雅和最有效的方式。是否有提高吞吐量的编程方式?
根据@radai 的回答进行编辑
感谢为我指明正确方向的答案,我仔细研究了不同的 Producer 方法。 《卡夫卡 - 权威指南》一书列出了这些方法:
Fire-and-forget
We send a message to the server and don’t really care if it arrives succesfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.
Synchronous send
We send a message, the send() method returns a Future object, and we use get()
to wait on the future and see if the send() was successful or not.
Asynchronous send
We call the send() method with a callback function, which gets triggered when it
receives a response from the Kafka broker
现在我的代码如下所示(省略了错误处理和回调的定义 class):
val asyncProducer = new KafkaProducer[String, String](someProperties)
for (msg <- messages) {
val record = new ProducerRecord[String, String](someTopic, someKey, msg)
asyncProducer.send(record, new compareProducerCallback)
}
asyncProducer.flush()
我比较了10000条非常小的消息的所有方法。这是我的测量结果:
即发即弃:173683464ns
同步发送:29195039875ns
异步发送:44153826ns
老实说,通过选择正确的属性(batch.size、linger.ms、...),优化它们的潜力可能更大。
我认为您的代码运行缓慢的最大原因是您在等待每一个未来的发送。
kafka 被设计成发送批次。通过一次发送一条记录,您将等待每条记录的往返时间,并且您不会从压缩中获得任何好处。
"idiomatic" 要做的事情是发送所有内容,然后在第二个循环中阻止所有结果期货。
此外,如果您打算这样做,我会重新启动(否则您的第一个记录会产生一批大小为 1 的文件,整体上会减慢您的速度。一旦您的制作人出现 https://en.wikipedia.org/wiki/Nagle%27s_algorithm) and call flush()发送循环完成。
在我的 Scala (2.11) 流应用程序中,我正在使用 IBM MQ 中一个队列的数据并将其写入具有一个分区的 Kafka 主题。使用来自 MQ 的数据后,消息负载被拆分为 3000 条较小的消息,这些消息存储在字符串序列中。然后这 3000 条消息中的每一条都使用 KafkaProducer 发送到 Kafka(版本 2.x)。
你会如何发送这 3000 条消息?
我无法增加 IBM MQ 中的队列数量(不受我的控制),也无法增加主题中的分区数量(需要对消息进行排序,编写自定义分区程序会影响太多消费者)主题)。
Producer 设置当前为:
- 确认=1
- linger.ms=0
- batch.size=65536
但优化它们可能是它自己的问题,而不是我当前问题的一部分。
目前,我正在做
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
val recordMetadata = future.get()
}
对我来说,这似乎不是最优雅和最有效的方式。是否有提高吞吐量的编程方式?
根据@radai 的回答进行编辑
感谢为我指明正确方向的答案,我仔细研究了不同的 Producer 方法。 《卡夫卡 - 权威指南》一书列出了这些方法:
Fire-and-forget We send a message to the server and don’t really care if it arrives succesfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.
Synchronous send We send a message, the send() method returns a Future object, and we use get() to wait on the future and see if the send() was successful or not.
Asynchronous send We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker
现在我的代码如下所示(省略了错误处理和回调的定义 class):
val asyncProducer = new KafkaProducer[String, String](someProperties)
for (msg <- messages) {
val record = new ProducerRecord[String, String](someTopic, someKey, msg)
asyncProducer.send(record, new compareProducerCallback)
}
asyncProducer.flush()
我比较了10000条非常小的消息的所有方法。这是我的测量结果:
即发即弃:173683464ns
同步发送:29195039875ns
异步发送:44153826ns
老实说,通过选择正确的属性(batch.size、linger.ms、...),优化它们的潜力可能更大。
我认为您的代码运行缓慢的最大原因是您在等待每一个未来的发送。
kafka 被设计成发送批次。通过一次发送一条记录,您将等待每条记录的往返时间,并且您不会从压缩中获得任何好处。
"idiomatic" 要做的事情是发送所有内容,然后在第二个循环中阻止所有结果期货。
此外,如果您打算这样做,我会重新启动(否则您的第一个记录会产生一批大小为 1 的文件,整体上会减慢您的速度。一旦您的制作人出现 https://en.wikipedia.org/wiki/Nagle%27s_algorithm) and call flush()发送循环完成。