增加 Kafka Streams 消费者吞吐量

Increase Kafka Streams Consumer Throughput

我有一个 Spark Streaming 应用程序和一个 Kafka Streams 应用程序 运行 并排,用于基准测试。两者都使用相同的输入主题并写入不同的目标数据库。输入主题有 15 个分区,spark streaming 和 kafka 流都有 15 个消费者(1:1 比例)。此外,事件有效载荷约为 2kb。不确定它是否相关,但 Spark Streaming 的 90% 百分位数执行时间约为 9 毫秒。卡夫卡流,12 毫秒。每次处理消息时都会在我的处理器中调用 commit() 方法。

问题出在高爆上。 Spark Streaming 可以跟上每秒 700 次,而 Kafka Streams 只能达到每秒 60/70 次左右。我不能超越那个。见下图:(绿线 - Spark Streaming / 蓝线 - Kafka Streams)

根据下面的配置,只要每个消费者不超过 1000 个事件,考虑到背压,无论每个分区的字节数如何,Spark Streaming 都可以跟上。至于 Kafka Streams,如果我正确理解它的配置(请保持诚实),基于下面的相同,我每 100 毫秒最多可以获取 1000 条记录(max.poll.records)(poll.ms ),只要每个分区不超过 1MB (max.partition.fetch.bytes),每次提取不超过 50MB (fetch.max.bytes)。

无论我使用的是 5、10 还是 15 个消费者,我都看到了相同的结果(卡在每秒 70 个事件上),这让我认为它与配置相关。我试图通过增加每次提取的记录数和每个分区的最大字节数来调整这些,但我没有得到显着的结果。

我知道这些是不同的技术并用于不同的目的,但我想知道我应该在 Kafka Streams 中使用什么值以获得更好的吞吐量。

Spark 流配置:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100

Kafka Streams 配置(所有字节和时序相关)

# Consumer Config
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
heartbeat.interval.ms = 3000 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 1000 
request.timeout.ms = 30000
enable.auto.commit = false

# StreamsConfig
poll.ms=100 

处理器代码

public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String payload) {

        ResponseEntity responseEntity = null;
        try {

          // Do Some processing

        } catch (final MyException e) {

          // Do Some Exception Handling

        } finally {

            context.forward(UUID.randomUUID().toString(), responseEntity);
            context.commit();
        }
    }

提前致谢!

更新

Kafka Streams 写入的数据库是这里的大瓶颈。在我们将它切换到一个更好的集群(更好的硬件、内存、内核等)之后,我使用下面的配置进行了调整,我每秒能够消耗大约 2k 个事件。提交间隔配置也已更改(根据 Augusto 的建议)并且还使用了 G1GC 垃圾收集器。

fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576 

fetch.max.wait.ms = 1000 
max.poll.records = 10000 
fetch.min.bytes = 100000
enable.auto.commit = false

if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).

这是不正确的。 :) max.poll.records 指定 return 可以由 poll() 编辑多少条记录——如果单个“获取”到代理 return 的更多记录,下一个“轮询( )" 调用将从消费者的内部缓冲区(即无网络请求)提供服务。 max.poll.records 基本上是一个调整应用程序代码的旋钮,即在再次调用 poll() 之前我想处理多少条记录。更频繁地调用 poll() 会使您的应用程序更具反应性(例如,只有在调用 poll() 时才会发生重新平衡——您还需要经常调用 poll 甚至不违反 max.poll.interval.ms)。

poll.ms是在没有数据可用的情况下poll()内的最大阻塞时间。这避免了忙等待。但是,如果有数据,poll()会立即return。

因此,实际的“网络吞吐量”仅基于“获取请求”设置。