当我尝试将并发设置为 3 时,ConcurrentMessageListenerContainer 中的分区分配对于 2/3 线程为空 [ ]
Partition assignment in ConcurrentMessageListenerContainer is empty [ ] for 2/3 threads while I am trying to set concurrency to 3
因为响应主题为空,当尝试同时发送 3 个请求时,只有一个得到响应。
下面是并发监听器容器:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
containerProperties.setGroupId(returnGroup);
ConcurrentMessageListenerContainer<String, String> kafkaMessageListenerContainer =
new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProperties);
kafkaMessageListenerContainer.setConcurrency(3);
return kafkaMessageListenerContainer;
}
我希望所有 3 个请求都按顺序完成。
正如Artem所说,每个分区只能有一个消费者(每个消费者组)。
如果主题只有一个分区,则只能有一个消费者。
让一个回复容器接收多个回复副本完全没有意义。
如果该主题有 3 个消费者并希望等待 3 个回复,下一个版本 (2.3) 将有一个 AggregatingReplyingKafkaTemplate
。 https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/reference/html/#aggregatingreplyingkafkatemplate
因为响应主题为空,当尝试同时发送 3 个请求时,只有一个得到响应。
下面是并发监听器容器:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
containerProperties.setGroupId(returnGroup);
ConcurrentMessageListenerContainer<String, String> kafkaMessageListenerContainer =
new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProperties);
kafkaMessageListenerContainer.setConcurrency(3);
return kafkaMessageListenerContainer;
}
我希望所有 3 个请求都按顺序完成。
正如Artem所说,每个分区只能有一个消费者(每个消费者组)。
如果主题只有一个分区,则只能有一个消费者。
让一个回复容器接收多个回复副本完全没有意义。
如果该主题有 3 个消费者并希望等待 3 个回复,下一个版本 (2.3) 将有一个 AggregatingReplyingKafkaTemplate
。 https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/reference/html/#aggregatingreplyingkafkatemplate