Spring Cloud Stream RabbitMQ 添加队列参数
Spring Cloud Stream RabbitMQ add Queue argument
我想知道如何向使用 spring 云流声明的 rabbitmq 队列添加额外参数。
我想为 RabbitMQ 3.8.x 使用单一主动消费者功能。为此,我必须向队列声明 x-single-active-consumer
.
添加一个额外的参数
无法直接使用 spring 属性对其进行配置。
spring 云流目前不支持设置任意队列参数。
请打开 GitHub issue against the binder 请求新功能。
但是,您可以简单地通过向应用程序添加一个 Queue
@Bean
参数集来声明队列。
或者,您可以简单地设置 exclusive
消费者活页夹属性,这提供了类似的语义;相互竞争的消费者将定期尝试重新连接。
编辑
@SpringBootApplication
@EnableBinding(Sink.class)
public class So59011707Application {
public static void main(String[] args) {
SpringApplication.run(So59011707Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
Queue queue() {
return QueueBuilder.durable("so59011707.myGroup")
.withArgument("x-single-active-consumer", true)
.build();
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("so59011707", "", "foo");
};
}
}
和
spring.cloud.stream.bindings.input.destination=so59011707
spring.cloud.stream.bindings.input.group=myGroup
您将在日志中看到一条错误消息
2019-11-24 10:24:22.310 ERROR 83004 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-single-active-consumer' for queue 'so59011707.myGroup' in vhost '/': received none but current is the value 'true' of type 'bool', class-id=50, method-id=10)
你可以忽略。或者您可以通过将 bindQueue
设置为 false 并添加 Exchange
和 Binding
@Bean
s 来避免它...
spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange())
.with("#");
}
@Bean
TopicExchange exchange() {
return new TopicExchange("so59011707");
}
我想知道如何向使用 spring 云流声明的 rabbitmq 队列添加额外参数。
我想为 RabbitMQ 3.8.x 使用单一主动消费者功能。为此,我必须向队列声明 x-single-active-consumer
.
添加一个额外的参数
无法直接使用 spring 属性对其进行配置。
spring 云流目前不支持设置任意队列参数。
请打开 GitHub issue against the binder 请求新功能。
但是,您可以简单地通过向应用程序添加一个 Queue
@Bean
参数集来声明队列。
或者,您可以简单地设置 exclusive
消费者活页夹属性,这提供了类似的语义;相互竞争的消费者将定期尝试重新连接。
编辑
@SpringBootApplication
@EnableBinding(Sink.class)
public class So59011707Application {
public static void main(String[] args) {
SpringApplication.run(So59011707Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
Queue queue() {
return QueueBuilder.durable("so59011707.myGroup")
.withArgument("x-single-active-consumer", true)
.build();
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("so59011707", "", "foo");
};
}
}
和
spring.cloud.stream.bindings.input.destination=so59011707
spring.cloud.stream.bindings.input.group=myGroup
您将在日志中看到一条错误消息
2019-11-24 10:24:22.310 ERROR 83004 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-single-active-consumer' for queue 'so59011707.myGroup' in vhost '/': received none but current is the value 'true' of type 'bool', class-id=50, method-id=10)
你可以忽略。或者您可以通过将 bindQueue
设置为 false 并添加 Exchange
和 Binding
@Bean
s 来避免它...
spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange())
.with("#");
}
@Bean
TopicExchange exchange() {
return new TopicExchange("so59011707");
}