如何设置 Kafka Producer 每秒消息速率?

How to set Kafka Producer message rate per second?

我正在读取一个 csv 文件并将此输入的行提供给我的 Kafka Producer。现在我希望我的 Kafka Producer 以每秒 100 条消息的速度生成消息。

如果你喜欢流处理,那么 akka-streams 对节流有很好的支持:http://doc.akka.io/docs/akka/current/java/stream/stream-quickstart.html#time-based-processing

然后 akka-stream-kafka(又名 reactive-kafka)库允许您将两者连接在一起:http://doc.akka.io/docs/akka-stream-kafka/current/home.html

查看 Kafka Producer 的 linger.msbatch.size 属性。 您必须相应地调整这些属性以获得所需的速率。

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.

在 Kafka JVM Producer 中,吞吐量取决于多种因素。最常见的计算方式是 MB/sec 而不是 Msg/sec。在您的示例中,假设 CSV 中的每一行大小为 1MB,那么您需要调整生产者配置以达到 100MB/sec,这样您就可以达到 100Msg/sec 的目标吞吐量。在调整生产者配置时,您必须考虑您的 batch.size(以字节为单位)配置值是多少?如果它设置得太低,那么生产者将尝试更频繁地发送消息并等待服务器的回复。这将提高生产者的吞吐量。但会影响延迟。如果您使用基于异步回调的生产者,那么在这种情况下,您的总吞吐量将受到生产者在等待 max.in.flight.request.per.connection 确定的服务器回复之前可以发送的消息数量的限制。 如果你保持 batch.size 过高,那么生产者吞吐量将受到影响,因为在等待 linger.ms 期间后,kafka 生产者将批量发送所有消息到该特定分区的代理。但是拥有更大的 batch.size 意味着更大的 buffer.memory 这可能会给 GC 带来压力。