Spring 与 RxNetty 集成中的异步消息网关

Async Messaging Gateway in Spring Integration with RxNetty

我正在尝试将 Spring 中的异步消息网关与 RxNetty(异步 HTTP)集成。基本上我想要的是 return 一个 observable/CompletableFuture 到调用线程,并使用 Observable 的 zip/map/flatmap 在调用线程中进行一堆出站 HTTP 调用。我只是想看看这是否可能。此外,如果不使用 Rxjava 构造,我最好使用聚合器 eip 来构建一个简单的工作流。

从版本 4.1 开始,网关可以 return Reactor 2.0 Promise<?>:

@MessagingGateway
public static interface TestGateway {

    @Gateway(requestChannel = "promiseChannel")
    Promise<Integer> multiply(Integer value);

}

    ...

@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
        return value * 2;
}

    ...

Streams.defer(Arrays.asList("1", "2", "3", "4", "5"))
            .get()
            .map(Integer::parseInt)
            .mapMany(integer -> testGateway.multiply(integer))
            .collect()
            .consume(integers -> ...)
            .flush();

从版本 5.0 开始,已更改为 Reactor 3.1 Mono

我很确定有一些适配器可以将这种类型转换为对 RxJava 有价值的东西。

网关自版本 4.2:

也支持 CompletableFuture<?>
CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#async-gateway