Spring Boot kafka消费者在另一个已经收到消息后没有收到消息
Springboot kafka consumer not receiving messages after another has already received the message
我是 KAFKA 的新手,需要帮助
我有 2 个应用程序 (springboot),它们 identical/copies 只是端口不同。
http://localhost:8080/
http://localhost:8081/
他们都是消费者
二人听XXX话题
-
我还有一个APP可以充当制作人的角色
每当我向主题 XXX 发送内容时。
其中只有一个消费消息,另一个不消费。
我分别测试过,如果他们单独听,他们听得很好,但如果他们一起听,只有一个人会听。
我正在使用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.0.RELEASE</version>
</dependency>
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.headers=type
spring.cloud.stream.kafka.default.consumer.ackEachRecord=true
spring.cloud.stream.kafka.default.consumer.enableDlq=true
spring.cloud.stream.kafka.default.consumer.standardHeaders=both
spring.cloud.stream.kafka.default.consumer.dlqName=api***.d**.api***
我的听众
@StreamListener(target=SomeString.TOPIC, condition = "headers['type']=='***' or headers['type']=='***'")
public void handle(GenericMessage<String> message) throws BusinessException {
***
}
The Apache Kafka Binder implementation maps each destination to an
Apache Kafka topic. The consumer group maps directly to the same
Apache Kafka concept. Partitioning also maps directly to Apache Kafka
partitions as well.
检查kafka消费组行为-> https://kafka.apache.org/documentation/#consumerconfigs_group.id
如果kafka消费者有相同的groupId只是其中一个监听消息,你应该给他们不同的groupId
8080 中的应用程序 -> spring.cloud.stream.bindings.<channelName>.group=consumer-group-1
8081 中的应用 -> spring.cloud.stream.bindings.<channelName>.group=consumer-group-2
我是 KAFKA 的新手,需要帮助
我有 2 个应用程序 (springboot),它们 identical/copies 只是端口不同。 http://localhost:8080/ http://localhost:8081/
他们都是消费者
二人听XXX话题
我还有一个APP可以充当制作人的角色
每当我向主题 XXX 发送内容时。
其中只有一个消费消息,另一个不消费。
我分别测试过,如果他们单独听,他们听得很好,但如果他们一起听,只有一个人会听。
我正在使用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.0.RELEASE</version>
</dependency>
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.headers=type
spring.cloud.stream.kafka.default.consumer.ackEachRecord=true
spring.cloud.stream.kafka.default.consumer.enableDlq=true
spring.cloud.stream.kafka.default.consumer.standardHeaders=both
spring.cloud.stream.kafka.default.consumer.dlqName=api***.d**.api***
我的听众
@StreamListener(target=SomeString.TOPIC, condition = "headers['type']=='***' or headers['type']=='***'")
public void handle(GenericMessage<String> message) throws BusinessException {
***
}
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
检查kafka消费组行为-> https://kafka.apache.org/documentation/#consumerconfigs_group.id
如果kafka消费者有相同的groupId只是其中一个监听消息,你应该给他们不同的groupId
8080 中的应用程序 -> spring.cloud.stream.bindings.<channelName>.group=consumer-group-1
8081 中的应用 -> spring.cloud.stream.bindings.<channelName>.group=consumer-group-2