使用 2 个消费者配置时消费者吞吐量缓慢
Slow consumer throuput when using 2 consumer-configuration
使用 spring-integration-kafka 扩展和以下配置:
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
zk-session-timeout="10000" zk-sync-time="2000" />
<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="realtime-services-consumer-grp"
value-decoder="purchaseDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="5" >
<int-kafka:topic id="purchase" streams="1" />
</int-kafka:consumer-configuration>
<int-kafka:consumer-configuration
group-id="realtime-services-consumer-gw"
value-decoder="eventDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="10" >
<int-kafka:topic id="event" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
例如,当我评论第一个 consumer-configuration
时,我每分钟可以处理 300 个事件,没有问题。但是当两者都被激活时。我的吞吐量非常低。来自这两个主题的总吞吐量低于每分钟 50。
有谁知道为什么我在阅读 2 个主题时表现如此糟糕?我在配置中做错了什么?
感谢您指出这一点!
在与我本地的 Kafka claster 进行了一番大战之后,我已经能够重现您的问题,并且我为您提供了一些解决方法:-)。
首先不是round-robin
,而是一个接一个:
for (final ConsumerConfiguration<K, V> consumerConfiguration : getConsumerConfigurations().values()) {
Map<String, Map<Integer, List<Object>>> messages = consumerConfiguration.receive();
其中每个 consumerConfiguration
在 consumer-timeout="5000"
期间都在后台被阻止,如果现在 KafkaStream
中没有消息。因此,来自 <int-kafka:inbound-channel-adapter>
的整个 poll
任务将被阻塞,直到超时或更糟:如果每个主题都没有消息,则整个等待时间就是超时的总和!
要解决此问题,您可以减少 consumer-timeout="5000"
或提供多个 <int-kafka:consumer-context>
,因此每个主题 <int-kafka:inbound-channel-adapter>
。
是的,它看起来很奇怪,我们在发布前还没有时间看一看这真的很糟糕,但无论如何请随时提出 JIRA 问题来修复它。
谢谢!
使用 spring-integration-kafka 扩展和以下配置:
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
zk-session-timeout="10000" zk-sync-time="2000" />
<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="realtime-services-consumer-grp"
value-decoder="purchaseDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="5" >
<int-kafka:topic id="purchase" streams="1" />
</int-kafka:consumer-configuration>
<int-kafka:consumer-configuration
group-id="realtime-services-consumer-gw"
value-decoder="eventDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="10" >
<int-kafka:topic id="event" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
例如,当我评论第一个 consumer-configuration
时,我每分钟可以处理 300 个事件,没有问题。但是当两者都被激活时。我的吞吐量非常低。来自这两个主题的总吞吐量低于每分钟 50。
有谁知道为什么我在阅读 2 个主题时表现如此糟糕?我在配置中做错了什么?
感谢您指出这一点!
在与我本地的 Kafka claster 进行了一番大战之后,我已经能够重现您的问题,并且我为您提供了一些解决方法:-)。
首先不是round-robin
,而是一个接一个:
for (final ConsumerConfiguration<K, V> consumerConfiguration : getConsumerConfigurations().values()) {
Map<String, Map<Integer, List<Object>>> messages = consumerConfiguration.receive();
其中每个 consumerConfiguration
在 consumer-timeout="5000"
期间都在后台被阻止,如果现在 KafkaStream
中没有消息。因此,来自 <int-kafka:inbound-channel-adapter>
的整个 poll
任务将被阻塞,直到超时或更糟:如果每个主题都没有消息,则整个等待时间就是超时的总和!
要解决此问题,您可以减少 consumer-timeout="5000"
或提供多个 <int-kafka:consumer-context>
,因此每个主题 <int-kafka:inbound-channel-adapter>
。
是的,它看起来很奇怪,我们在发布前还没有时间看一看这真的很糟糕,但无论如何请随时提出 JIRA 问题来修复它。
谢谢!