Spring 与 RxNetty 集成中的异步消息网关
Async Messaging Gateway in Spring Integration with RxNetty
我正在尝试将 Spring 中的异步消息网关与 RxNetty(异步 HTTP)集成。基本上我想要的是 return 一个 observable/CompletableFuture 到调用线程,并使用 Observable 的 zip/map/flatmap 在调用线程中进行一堆出站 HTTP 调用。我只是想看看这是否可能。此外,如果不使用 Rxjava 构造,我最好使用聚合器 eip 来构建一个简单的工作流。
从版本 4.1
开始,网关可以 return Reactor 2.0 Promise<?>
:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Promise<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Streams.defer(Arrays.asList("1", "2", "3", "4", "5"))
.get()
.map(Integer::parseInt)
.mapMany(integer -> testGateway.multiply(integer))
.collect()
.consume(integers -> ...)
.flush();
从版本 5.0
开始,已更改为 Reactor 3.1 Mono
。
我很确定有一些适配器可以将这种类型转换为对 RxJava 有价值的东西。
网关自版本 4.2
:
也支持 CompletableFuture<?>
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
我正在尝试将 Spring 中的异步消息网关与 RxNetty(异步 HTTP)集成。基本上我想要的是 return 一个 observable/CompletableFuture 到调用线程,并使用 Observable 的 zip/map/flatmap 在调用线程中进行一堆出站 HTTP 调用。我只是想看看这是否可能。此外,如果不使用 Rxjava 构造,我最好使用聚合器 eip 来构建一个简单的工作流。
从版本 4.1
开始,网关可以 return Reactor 2.0 Promise<?>
:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Promise<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Streams.defer(Arrays.asList("1", "2", "3", "4", "5"))
.get()
.map(Integer::parseInt)
.mapMany(integer -> testGateway.multiply(integer))
.collect()
.consume(integers -> ...)
.flush();
从版本 5.0
开始,已更改为 Reactor 3.1 Mono
。
我很确定有一些适配器可以将这种类型转换为对 RxJava 有价值的东西。
网关自版本 4.2
:
CompletableFuture<?>
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);