具有 Spring 集成的 PollableChannel
PollableChannel with Spring Integration
我有一个接口Channels.java
final String OUTPUT = "output";
final String INPUT = "input";
@Output(OUTPUT)
MessageChannel output();
@BridgeFrom(OUTPUT)
PollableChannel input();
我有另一个 class,我在其中执行所有消息传递操作:
@Autowired
@Qualifier(Channels.OUTPUT)
private MessageChannel Output;
我可以很好地向交易所发送消息。如何在这里使用我的 PollableChannel?我做错了什么?
编辑
以及如何访问我的@Component class 中的 bean?
我现在有 @Configuration class 和
@Bean
@BridgeTo(Channels.OUTPUT)
public PollableChannel polled() {
return new QueueChannel();
}
希望能够使用此频道接收消息?
桥必须是 @Bean
而不是接口方法上的注释 - 参见 the answer to your general question here。
编辑
@SpringBootApplication
@EnableBinding(Source.class)
public class So44018382Application implements CommandLineRunner {
final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So44018382Application.class, args);
Thread.sleep(60_000);
context.close();
}
@RabbitListener(bindings =
@QueueBinding(value = @Queue(value = "foo", autoDelete = "true"),
exchange = @Exchange(value = "output", type = "topic"), key = "#"))
// bind a queue to the output exchange
public void listen(String in) {
this.logger.info("received " + in);
}
@BridgeTo(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
@Bean
public PollableChannel polled() {
return new QueueChannel(5);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 30; i++) {
polled().send(new GenericMessage<>("foo" + i));
this.logger.info("sent foo" + i);
}
}
}
这对我来说很好;队列的深度为 5;当它已满时,发送者阻塞;轮询器一次只删除 2 条消息并将它们发送到输出通道。
此示例还添加了一个 rabbit 侦听器来使用发送到活页夹的消息。
我有一个接口Channels.java
final String OUTPUT = "output";
final String INPUT = "input";
@Output(OUTPUT)
MessageChannel output();
@BridgeFrom(OUTPUT)
PollableChannel input();
我有另一个 class,我在其中执行所有消息传递操作:
@Autowired
@Qualifier(Channels.OUTPUT)
private MessageChannel Output;
我可以很好地向交易所发送消息。如何在这里使用我的 PollableChannel?我做错了什么?
编辑
以及如何访问我的@Component class 中的 bean?
我现在有 @Configuration class 和
@Bean
@BridgeTo(Channels.OUTPUT)
public PollableChannel polled() {
return new QueueChannel();
}
希望能够使用此频道接收消息?
桥必须是 @Bean
而不是接口方法上的注释 - 参见 the answer to your general question here。
编辑
@SpringBootApplication
@EnableBinding(Source.class)
public class So44018382Application implements CommandLineRunner {
final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So44018382Application.class, args);
Thread.sleep(60_000);
context.close();
}
@RabbitListener(bindings =
@QueueBinding(value = @Queue(value = "foo", autoDelete = "true"),
exchange = @Exchange(value = "output", type = "topic"), key = "#"))
// bind a queue to the output exchange
public void listen(String in) {
this.logger.info("received " + in);
}
@BridgeTo(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
@Bean
public PollableChannel polled() {
return new QueueChannel(5);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 30; i++) {
polled().send(new GenericMessage<>("foo" + i));
this.logger.info("sent foo" + i);
}
}
}
这对我来说很好;队列的深度为 5;当它已满时,发送者阻塞;轮询器一次只删除 2 条消息并将它们发送到输出通道。
此示例还添加了一个 rabbit 侦听器来使用发送到活页夹的消息。