组 属性 无法在 spring 流云中工作?
Group property not work in spring stream cloud?
我使用 Spring Stream Cloud 在 Kafka 上消费消息。
当在kafka上产生消息时所有消费者都命中。
但是 kafka 的文档说通过使用组
只有一个消费者消费消息。
这是我的消费者代码。
@EnableBinding(Sink.class)
public class Consumer2 {
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
System.out.println("33333");
}
@StreamListener(target = Sink.INPUT)
public void consume1(String message) {
System.out.println("444444");
}
}
}
这是我的配置
但是我的两个方法都叫:(
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers:
- localhost:9092
bindings:
input:
binder: kafka
destination: abbas
content-type: text/plain
group: input-group-1
output:
binder: kafka
destination: abbas
group: output-group-1
content-type: text/plain
使用该配置,您只有 1 个消费者 (SINK.INPUT)
,而不是 2 个消费者(@StreamListener
不是消费者,它是处理入站的模型消息)
这就是为什么 spring 将入站消息路由到您的两个 @StreamListener
使用相同接收器注释的原因。
在一个实例组中没有影响。
当运行我们的应用程序的多个实例时,每当输入通道中有新消息时,所有订阅者都会收到通知。
大多数时候,我们只需要处理一次消息。 Spring Cloud Stream 通过消费者组实现此行为。
要启用此行为,每个消费者绑定都可以使用 spring.cloud.stream.bindings..group 属性 来指定组名:
我使用 Spring Stream Cloud 在 Kafka 上消费消息。 当在kafka上产生消息时所有消费者都命中。
但是 kafka 的文档说通过使用组 只有一个消费者消费消息。
这是我的消费者代码。
@EnableBinding(Sink.class)
public class Consumer2 {
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
System.out.println("33333");
}
@StreamListener(target = Sink.INPUT)
public void consume1(String message) {
System.out.println("444444");
}
}
}
这是我的配置 但是我的两个方法都叫:(
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers:
- localhost:9092
bindings:
input:
binder: kafka
destination: abbas
content-type: text/plain
group: input-group-1
output:
binder: kafka
destination: abbas
group: output-group-1
content-type: text/plain
使用该配置,您只有 1 个消费者 (SINK.INPUT)
,而不是 2 个消费者(@StreamListener
不是消费者,它是处理入站的模型消息)
这就是为什么 spring 将入站消息路由到您的两个 @StreamListener
使用相同接收器注释的原因。
在一个实例组中没有影响。
当运行我们的应用程序的多个实例时,每当输入通道中有新消息时,所有订阅者都会收到通知。
大多数时候,我们只需要处理一次消息。 Spring Cloud Stream 通过消费者组实现此行为。
要启用此行为,每个消费者绑定都可以使用 spring.cloud.stream.bindings..group 属性 来指定组名: