在 spring-webflux 中使用 Spring AMQP 消费者

Using Spring AMQP consumer in spring-webflux

我有一个使用 Boot 2.0 和 webflux 的应用程序,并且有一个端点返回 ServerSentEvent 的 Flux。这些事件是通过利用 spring-amqp 消耗 RabbitMQ 队列中的消息而创建的。我的问题是:如何最好地将 MessageListener 的已配置侦听器方法桥接到可以传递到我的控制器的 Flux?

Project Reactor 的 create 部分提到它 "can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners",但我不确定如何直接连接到消息侦听器,因为它包含在 DirectMessageListenerContainer 和 [=17= 中].他们在创建部分的示例:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

到目前为止,我拥有的最佳选择是创建一个 Processor 并在 RabbitMQ 侦听器方法中每次调用 onNext() 以手动生成一个事件。

假设您想要一个 RabbitMQ 侦听器以某种方式将消息发送给一个或多个 Flux(es)。 Flux.create 确实是创建这样一个 Flux 的好方法。

让我们从Messaging with RabbitMQ Spring指南开始,并尝试适应它。

必须修改原始 Receiver 才能将收到的消息放入 FluxSink

@Component
public class Receiver {

    /**
     * Collection of sinks enables more than one subscriber.
     * Have to keep in mind that the FluxSink instance that the emitter works with, is provided per-subscriber.
     */
    private final List<FluxSink<String>> sinks = new ArrayList<>();

    /**
     * Adds a sink to the collection. From now on, new messages will be put to the sink.
     * Method will be called when a new Flux is created by calling Flux.create method.
     */  
    public void addSink(FluxSink<String> sink) {
        sinks.add(sink);
    }

    public void receiveMessage(String message) {
        sinks.forEach(sink -> {
            if (!sink.isCancelled()) {
                sink.next(message);
            } else {
                // If canceled, don't put any new messages to the sink.
                // Sink is canceled when a subscriber cancels the subscription.
                sinks.remove(sink);
            }
        });
    }
}

现在我们有了一个接收器,可以将 RabbitMQ 消息放入接收器。然后,创建一个 Flux 就相当简单了。

@Component
public class FluxFactory {

    private final Receiver receiver;

    public FluxFactory(Receiver receiver) { this.receiver = receiver; }

    public Flux<String> createFlux() {
        return Flux.create(receiver::addSink);
    }
}

Receiver bean 被自动装配到工厂。当然,您不必创建一个特殊的工厂。这仅演示了如何使用 Receiver 创建 Flux.

的想法

Messaging with RabbitMQ 指南中的应用程序的其余部分可能保持不变,包括 bean 实例化。

@SpringBootApplication
public class Application {
    ...
    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
    ...
}

我使用类似的设计来适配 Twitter 流 API 成功。不过,可能有更好的方法。

我有这样的东西:

@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);

        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("foo", "event-" + i);
        }

    }

    private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);

    @GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getSeeFromAmqp() {
        return this.sseFluxProcessor;
    }

    @RabbitListener(id = "fooListener", queues = "foo")
    public void handleAmqpMessages(String message) {
        this.sseFluxProcessor.onNext(message);
    }

}

TopicProcessor.share() 允许有许多并发订阅者,当我们 return 这个 TopicProcessor 作为我们 /sseFromAmqp REST 请求的 Flux 通过WebFlux.

@RabbitListener 只是将其收到的消息委托给 TopicProcessor

main() 我有一个代码来确认即使没有订阅者我也可以发布到 TopicProcessor

使用两个单独的 curl 会话进行测试,并通过 RabbitMQ 管理插件将消息发布到队列。

顺便说一句,我使用 share() 因为:https://projectreactor.io/docs/core/release/reference/#_topicprocessor

from multiple upstream Publishers when created in the shared configuration

那是因为 @RabbitListener 确实可以从不同的 ListenerContainer 线程同时调用。

更新

我还把这个示例移到了我的 Sandbox: https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux