在 spring-webflux 中使用 Spring AMQP 消费者
Using Spring AMQP consumer in spring-webflux
我有一个使用 Boot 2.0 和 webflux 的应用程序,并且有一个端点返回 ServerSentEvent
的 Flux。这些事件是通过利用 spring-amqp
消耗 RabbitMQ 队列中的消息而创建的。我的问题是:如何最好地将 MessageListener
的已配置侦听器方法桥接到可以传递到我的控制器的 Flux?
Project Reactor 的 create 部分提到它 "can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners",但我不确定如何直接连接到消息侦听器,因为它包含在 DirectMessageListenerContainer
和 [=17= 中].他们在创建部分的示例:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
到目前为止,我拥有的最佳选择是创建一个 Processor
并在 RabbitMQ 侦听器方法中每次调用 onNext()
以手动生成一个事件。
假设您想要一个 RabbitMQ 侦听器以某种方式将消息发送给一个或多个 Flux
(es)。 Flux.create
确实是创建这样一个 Flux
的好方法。
让我们从Messaging with RabbitMQ Spring指南开始,并尝试适应它。
必须修改原始 Receiver
才能将收到的消息放入 FluxSink
。
@Component
public class Receiver {
/**
* Collection of sinks enables more than one subscriber.
* Have to keep in mind that the FluxSink instance that the emitter works with, is provided per-subscriber.
*/
private final List<FluxSink<String>> sinks = new ArrayList<>();
/**
* Adds a sink to the collection. From now on, new messages will be put to the sink.
* Method will be called when a new Flux is created by calling Flux.create method.
*/
public void addSink(FluxSink<String> sink) {
sinks.add(sink);
}
public void receiveMessage(String message) {
sinks.forEach(sink -> {
if (!sink.isCancelled()) {
sink.next(message);
} else {
// If canceled, don't put any new messages to the sink.
// Sink is canceled when a subscriber cancels the subscription.
sinks.remove(sink);
}
});
}
}
现在我们有了一个接收器,可以将 RabbitMQ 消息放入接收器。然后,创建一个 Flux
就相当简单了。
@Component
public class FluxFactory {
private final Receiver receiver;
public FluxFactory(Receiver receiver) { this.receiver = receiver; }
public Flux<String> createFlux() {
return Flux.create(receiver::addSink);
}
}
Receiver
bean 被自动装配到工厂。当然,您不必创建一个特殊的工厂。这仅演示了如何使用 Receiver
创建 Flux
.
的想法
Messaging with RabbitMQ 指南中的应用程序的其余部分可能保持不变,包括 bean 实例化。
@SpringBootApplication
public class Application {
...
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
...
}
我使用类似的设计来适配 Twitter 流 API 成功。不过,可能有更好的方法。
我有这样的东西:
@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("foo", "event-" + i);
}
}
private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);
@GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromAmqp() {
return this.sseFluxProcessor;
}
@RabbitListener(id = "fooListener", queues = "foo")
public void handleAmqpMessages(String message) {
this.sseFluxProcessor.onNext(message);
}
}
TopicProcessor.share()
允许有许多并发订阅者,当我们 return 这个 TopicProcessor
作为我们 /sseFromAmqp
REST 请求的 Flux
通过WebFlux.
@RabbitListener
只是将其收到的消息委托给 TopicProcessor
。
在 main()
我有一个代码来确认即使没有订阅者我也可以发布到 TopicProcessor
。
使用两个单独的 curl
会话进行测试,并通过 RabbitMQ 管理插件将消息发布到队列。
顺便说一句,我使用 share()
因为:https://projectreactor.io/docs/core/release/reference/#_topicprocessor
from multiple upstream Publishers when created in the shared configuration
那是因为 @RabbitListener
确实可以从不同的 ListenerContainer
线程同时调用。
更新
我还把这个示例移到了我的 Sandbox
: https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux
我有一个使用 Boot 2.0 和 webflux 的应用程序,并且有一个端点返回 ServerSentEvent
的 Flux。这些事件是通过利用 spring-amqp
消耗 RabbitMQ 队列中的消息而创建的。我的问题是:如何最好地将 MessageListener
的已配置侦听器方法桥接到可以传递到我的控制器的 Flux?
Project Reactor 的 create 部分提到它 "can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners",但我不确定如何直接连接到消息侦听器,因为它包含在 DirectMessageListenerContainer
和 [=17= 中].他们在创建部分的示例:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
到目前为止,我拥有的最佳选择是创建一个 Processor
并在 RabbitMQ 侦听器方法中每次调用 onNext()
以手动生成一个事件。
假设您想要一个 RabbitMQ 侦听器以某种方式将消息发送给一个或多个 Flux
(es)。 Flux.create
确实是创建这样一个 Flux
的好方法。
让我们从Messaging with RabbitMQ Spring指南开始,并尝试适应它。
必须修改原始 Receiver
才能将收到的消息放入 FluxSink
。
@Component
public class Receiver {
/**
* Collection of sinks enables more than one subscriber.
* Have to keep in mind that the FluxSink instance that the emitter works with, is provided per-subscriber.
*/
private final List<FluxSink<String>> sinks = new ArrayList<>();
/**
* Adds a sink to the collection. From now on, new messages will be put to the sink.
* Method will be called when a new Flux is created by calling Flux.create method.
*/
public void addSink(FluxSink<String> sink) {
sinks.add(sink);
}
public void receiveMessage(String message) {
sinks.forEach(sink -> {
if (!sink.isCancelled()) {
sink.next(message);
} else {
// If canceled, don't put any new messages to the sink.
// Sink is canceled when a subscriber cancels the subscription.
sinks.remove(sink);
}
});
}
}
现在我们有了一个接收器,可以将 RabbitMQ 消息放入接收器。然后,创建一个 Flux
就相当简单了。
@Component
public class FluxFactory {
private final Receiver receiver;
public FluxFactory(Receiver receiver) { this.receiver = receiver; }
public Flux<String> createFlux() {
return Flux.create(receiver::addSink);
}
}
Receiver
bean 被自动装配到工厂。当然,您不必创建一个特殊的工厂。这仅演示了如何使用 Receiver
创建 Flux
.
Messaging with RabbitMQ 指南中的应用程序的其余部分可能保持不变,包括 bean 实例化。
@SpringBootApplication
public class Application {
...
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
...
}
我使用类似的设计来适配 Twitter 流 API 成功。不过,可能有更好的方法。
我有这样的东西:
@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("foo", "event-" + i);
}
}
private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);
@GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromAmqp() {
return this.sseFluxProcessor;
}
@RabbitListener(id = "fooListener", queues = "foo")
public void handleAmqpMessages(String message) {
this.sseFluxProcessor.onNext(message);
}
}
TopicProcessor.share()
允许有许多并发订阅者,当我们 return 这个 TopicProcessor
作为我们 /sseFromAmqp
REST 请求的 Flux
通过WebFlux.
@RabbitListener
只是将其收到的消息委托给 TopicProcessor
。
在 main()
我有一个代码来确认即使没有订阅者我也可以发布到 TopicProcessor
。
使用两个单独的 curl
会话进行测试,并通过 RabbitMQ 管理插件将消息发布到队列。
顺便说一句,我使用 share()
因为:https://projectreactor.io/docs/core/release/reference/#_topicprocessor
from multiple upstream Publishers when created in the shared configuration
那是因为 @RabbitListener
确实可以从不同的 ListenerContainer
线程同时调用。
更新
我还把这个示例移到了我的 Sandbox
: https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux