spring boot rabbitmq 中绑定的概念是什么?
What is idea of bindings in spring boot rabbitmq?
我需要将具有多个路由键的多个交换器绑定到一个队列,并能够通过交换器和路由键发送消息,并通过队列名称侦听队列来接收消息。
我的代码:
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(ExchangeConfig.class)
public class RabbitConfig {
private final ExchangeConfig exchangeConfig;
@Bean
public List<Binding> bindings() {
List<Binding> bindings = new ArrayList<>();
exchangeConfig.getExchangesWithKeys()
.forEach(exchangeWithKeys -> exchangeWithKeys.getRoutingKeys()
.forEach(key -> {
Exchange exchange = ExchangeBuilder.directExchange(exchangeWithKeys.getExchange()).build();
Queue queue = QueueBuilder.durable(exchangeConfig.getLogsQueue()).build();
Binding binding = BindingBuilder.bind(queue).to(exchange)
.with(key).noargs();
bindings.add(binding);
}));
return bindings;
}
}
配置:
spring:
rabbitmq:
host: localhost
port: 5672
rabbitmq:
exchanges-with-keys:
- exchange: exchange1
routing-keys: exchange1.live, exchange1.after
- exchange: exchange2
routing-keys: exchange2.live, exchange2.after
- exchange: exchange3
routing-keys: exchange3.live, exchange3.after
logs-queue: log-messages_q
道具:
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class ExchangeConfig {
private String logsQueue;
private List<ExchangeWithKeys> exchangesWithKeys;
@Data
public static class ExchangeWithKeys {
private String exchange;
private List<String> routingKeys;
}
}
听众:
@Component
@Slf4j
@RequiredArgsConstructor
public class LogsListener {
private final LogMessageEventProcessor logMessageEventProcessor;
@RabbitListener(queues = "${rabbitmq.logs-queue}")
public void onLiveEvent(LogMessageEvent event) {
log.info("Received log event message [{}]", event.getBody());
logMessageEventProcessor.processLogMessageEvent(event);
}
}
测试:
@SpringBootTest
@ContextConfiguration(initializers = LogsListenerTest.Initializer.class)
class LogsListenerTest {
@Autowired
private RabbitTemplate template;
@ClassRule
private static final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.7.25-management-alpine")
.withExposedPorts(5672, 15672).withQueue("log-messages_q");
@BeforeAll
private static void startRabbit() {container.start();}
@AfterAll
private static void stopRabbit() {
container.stop();
}
@Test
public void test() {
template.convertAndSend("exchange1", "exchange1.live", new LogMessageEvent());
template.receiveAndConvert("log-messages_q");
}
public static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
val values = TestPropertyValues.of(
"spring.rabbitmq.host=" + container.getContainerIpAddress(),
"spring.rabbitmq.port=" + container.getMappedPort(5672)
);
values.applyTo(configurableApplicationContext);
}
}
}
以上所有内容均无效。
那么我应该将这些绑定放在哪里才能使其工作?谢谢。
您使用的是什么版本? List<Binding>
的使用已替换为 Declarables
。
见https://docs.spring.io/spring-amqp/docs/current/reference/html/#collection-declaration
文档有点过时,管理员 declareCollections
属性 在 2.2 中被删除了。
我需要将具有多个路由键的多个交换器绑定到一个队列,并能够通过交换器和路由键发送消息,并通过队列名称侦听队列来接收消息。
我的代码:
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(ExchangeConfig.class)
public class RabbitConfig {
private final ExchangeConfig exchangeConfig;
@Bean
public List<Binding> bindings() {
List<Binding> bindings = new ArrayList<>();
exchangeConfig.getExchangesWithKeys()
.forEach(exchangeWithKeys -> exchangeWithKeys.getRoutingKeys()
.forEach(key -> {
Exchange exchange = ExchangeBuilder.directExchange(exchangeWithKeys.getExchange()).build();
Queue queue = QueueBuilder.durable(exchangeConfig.getLogsQueue()).build();
Binding binding = BindingBuilder.bind(queue).to(exchange)
.with(key).noargs();
bindings.add(binding);
}));
return bindings;
}
}
配置:
spring:
rabbitmq:
host: localhost
port: 5672
rabbitmq:
exchanges-with-keys:
- exchange: exchange1
routing-keys: exchange1.live, exchange1.after
- exchange: exchange2
routing-keys: exchange2.live, exchange2.after
- exchange: exchange3
routing-keys: exchange3.live, exchange3.after
logs-queue: log-messages_q
道具:
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class ExchangeConfig {
private String logsQueue;
private List<ExchangeWithKeys> exchangesWithKeys;
@Data
public static class ExchangeWithKeys {
private String exchange;
private List<String> routingKeys;
}
}
听众:
@Component
@Slf4j
@RequiredArgsConstructor
public class LogsListener {
private final LogMessageEventProcessor logMessageEventProcessor;
@RabbitListener(queues = "${rabbitmq.logs-queue}")
public void onLiveEvent(LogMessageEvent event) {
log.info("Received log event message [{}]", event.getBody());
logMessageEventProcessor.processLogMessageEvent(event);
}
}
测试:
@SpringBootTest
@ContextConfiguration(initializers = LogsListenerTest.Initializer.class)
class LogsListenerTest {
@Autowired
private RabbitTemplate template;
@ClassRule
private static final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.7.25-management-alpine")
.withExposedPorts(5672, 15672).withQueue("log-messages_q");
@BeforeAll
private static void startRabbit() {container.start();}
@AfterAll
private static void stopRabbit() {
container.stop();
}
@Test
public void test() {
template.convertAndSend("exchange1", "exchange1.live", new LogMessageEvent());
template.receiveAndConvert("log-messages_q");
}
public static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
val values = TestPropertyValues.of(
"spring.rabbitmq.host=" + container.getContainerIpAddress(),
"spring.rabbitmq.port=" + container.getMappedPort(5672)
);
values.applyTo(configurableApplicationContext);
}
}
}
以上所有内容均无效。
那么我应该将这些绑定放在哪里才能使其工作?谢谢。
您使用的是什么版本? List<Binding>
的使用已替换为 Declarables
。
见https://docs.spring.io/spring-amqp/docs/current/reference/html/#collection-declaration
文档有点过时,管理员 declareCollections
属性 在 2.2 中被删除了。