Spring Integration ServiceActivator 是否适用于 Project Reactor 类型?

Does Spring Integration ServiceActivator work with Project Reactor Types?

我正在使用 Spring Integration 5.1.5(RabbitMQ 使用 spring-integration-amqp)并且我正在阅读 docs that Spring Integration has support for project reactor types (by this I mean Mono, Flux etc). But I cannot get this to work for ServiceActivator。我正在尝试这样的事情:

@ServiceActivator
public Mono<Void> myMethod(List<Message> messages) {
   Mono<Void> result = myService.doServiceStuff(messages);
   return result;
}

(请注意,我也在尝试让 myMethodFlux<Message> 一起工作,但这是一个单独的问题)。

myMethod returns Mono<Void> 我得到这个错误:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    ... 42 common frames omitted

将方法更改为:

@ServiceActivator
public void myMethod(List<Message> messages) {
   Mono<Void> result = myService.doServiceStuff(messages);
   result.subscribe(); // This is not what I want to do
}

并手动订阅反应流将使其工作,但这显然不是我想要做的。我更希望 spring-integration 框架来处理订阅。

Spring 集成是否支持此功能?如果是这样,我做错了什么?

您 returning Mono<Void> 到底想达到什么目的?

当服务激活器方法 return 有任何值时,该值将被发送到输出通道。如果是 Mono<?>,则在单声道完成时执行发送。

只需将 return 类型设置为 void