设置消息优先级 Spring 集成 DSL
Set message priority Spring Integration DSL
我正在尝试设置一个将消息发布到 RabbitMQ 的集成工作流。
关于这个我有两个问题:
1. 我的 Queue Bean 是否如我所愿地工作:)
2. 如何使用 Integration DSL 通过 outbound-amqp-adapter 设置消息的优先级?
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
TopicExchange worksExchange() {
return new TopicExchange("work.exchange", true, false);
}
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
return new Queue("dms.document.upload.queue", true, false, false, args);
}
@Bean
public RabbitTemplate worksRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setExchange("work.exchange");
template.setRoutingKey("work");
template.setConnectionFactory(rabbitConnectionFactory);
return template;
}
@Configuration
public class WorksOutbound {
@Autowired
private RabbitConfig rabbitConfig;
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("worksChannel")
.transform(Transformers.toJson())
.handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
.get();
}
}
更新
在能够使用适当的 "Priority header" 推送消息后,我可以使用 Rabbit Management UI 根据消息的优先级提取消息,但我无法使用 spring- 正确地提取它们amqp 消费者...
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(2);
container.setDefaultRequeueRejected(false);
return container;
}
看起来不错
在 .handle()
之前,使用带有 header 名称 IntegrationMessageHeaderAccessor.PRIORITY
和整数值的 .enrichHeaders(...)
。
编辑
@SpringBootApplication
public class So49692361Application {
public static void main(String[] args) {
SpringApplication.run(So49692361Application.class, args);
}
@Bean
public ApplicationRunner runner(SimpleMessageListenerContainer container, ApplicationContext ctx) {
return args -> {
Gate gate = ctx.getBean(Gate.class);
gate.send(new GenericMessage<>("foo", Collections.singletonMap("foo", 1)));
gate.send(new GenericMessage<>("bar", Collections.singletonMap("foo", 2)));
container.start();
};
}
@Bean
public static IntegrationFlow flow(AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.PRIORITY,
"headers.foo"))
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("so49692361"))
.get();
}
@Bean
public Queue queue() {
return new Queue("so49692361", true, false, false, Collections.singletonMap("x-max-priority", 5));
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueues(queue());
container.setMessageListener(m -> {
System.out.println(m);
});
container.setAutoStartup(false);
return container;
}
public interface Gate {
public void send(Message<?> message);
}
}
和
(Body:'bar' MessageProperties [headers={errorChannel=, foo=2, priority=2}, ...
(Body:'foo' MessageProperties [headers={errorChannel=, foo=1, priority=1}, ...
我正在尝试设置一个将消息发布到 RabbitMQ 的集成工作流。
关于这个我有两个问题: 1. 我的 Queue Bean 是否如我所愿地工作:) 2. 如何使用 Integration DSL 通过 outbound-amqp-adapter 设置消息的优先级?
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
TopicExchange worksExchange() {
return new TopicExchange("work.exchange", true, false);
}
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
return new Queue("dms.document.upload.queue", true, false, false, args);
}
@Bean
public RabbitTemplate worksRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setExchange("work.exchange");
template.setRoutingKey("work");
template.setConnectionFactory(rabbitConnectionFactory);
return template;
}
@Configuration
public class WorksOutbound {
@Autowired
private RabbitConfig rabbitConfig;
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("worksChannel")
.transform(Transformers.toJson())
.handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
.get();
}
}
更新 在能够使用适当的 "Priority header" 推送消息后,我可以使用 Rabbit Management UI 根据消息的优先级提取消息,但我无法使用 spring- 正确地提取它们amqp 消费者...
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(2);
container.setDefaultRequeueRejected(false);
return container;
}
看起来不错
在
.handle()
之前,使用带有 header 名称IntegrationMessageHeaderAccessor.PRIORITY
和整数值的.enrichHeaders(...)
。
编辑
@SpringBootApplication
public class So49692361Application {
public static void main(String[] args) {
SpringApplication.run(So49692361Application.class, args);
}
@Bean
public ApplicationRunner runner(SimpleMessageListenerContainer container, ApplicationContext ctx) {
return args -> {
Gate gate = ctx.getBean(Gate.class);
gate.send(new GenericMessage<>("foo", Collections.singletonMap("foo", 1)));
gate.send(new GenericMessage<>("bar", Collections.singletonMap("foo", 2)));
container.start();
};
}
@Bean
public static IntegrationFlow flow(AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.PRIORITY,
"headers.foo"))
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("so49692361"))
.get();
}
@Bean
public Queue queue() {
return new Queue("so49692361", true, false, false, Collections.singletonMap("x-max-priority", 5));
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueues(queue());
container.setMessageListener(m -> {
System.out.println(m);
});
container.setAutoStartup(false);
return container;
}
public interface Gate {
public void send(Message<?> message);
}
}
和
(Body:'bar' MessageProperties [headers={errorChannel=, foo=2, priority=2}, ...
(Body:'foo' MessageProperties [headers={errorChannel=, foo=1, priority=1}, ...