如何编写一个接受来自两个相关流的数据的订阅者?
How do I write a subscriber that accepts data from two related streams?
我是反应式编程和 Project Reactor 的新手,如果我用错了词,请提前致歉。
我有一个简单的过程性程序,我正在将其转换为反应式程序,更像是一种学习 activity,而不是其他任何东西。它不是网络应用程序,所以我只是使用 Project Reactor 而不是 Spring WebFlux。在旧应用程序中,逻辑是这样的:
An event is received. Take the customer ID from that event, get the full details of that customer, then pass both the event and the customer data to something downstream.
final OrderEvent event = mapper.readValue(json, OrderEvent.class);
final OrderData data = event.getData();
// Retrieve customer
final Customer customer = service.getCustomerWithId(data.getCustomerId());
// Log a summary
LOG.info(
"Received an event indicating that {} ordered {} products", customer.getName(), data.getOrderCount());
downstream.doSomething(data, customer);
忽略最后一条语句,这似乎很容易用 Reactor 实现。传入的 JSON 可以包装在 Mono<String>
中,一系列 map
(和 flatMap
?)语句可以将其转换为 Mono<MyData>
。我将服务更改为 return a Mono<Customer>
但我正在努力解决如何将服务的输出传递给下游对象的问题。如果它只需要客户数据,那么我会让它成为订阅者。不过,我被扔了,因为它需要原始数据 和 客户。如果我只是将所有内容链接在一起,当它转换为客户时我会丢失数据。
我应该如何处理这个问题?我考虑过使用 ContextView
但这似乎是为了将数据从 Subscriber
向上推,而不是相反。
编辑: 我试过了 Flux.zip
。这可能适用于此处描述的用例,但这是最好的方法吗?我担心在其他用例中,例如如果输入是客户 ID 并且 Web 服务 return 将该客户的所有订单编辑为 Flux<OrderData>
,那么被压缩的两个流将在每个流中包含不同数量的元素,而我不是确定它会起作用。
// Convert incoming events (a List) into a stream of order data
final Flux<OrderData> orderData = Flux.fromIterable(this.events)
.map(event -> event.getData());
// Get customers associated with order data. Each order should have exactly 1 customer.
final Flux<Customer> customers = orderData
.flatMap(data -> this.service.apply(data.getCustomerId()));
// Do something
Flux.zip(customers, orderData)
.doOnNext(tuple -> this.downstream.accept(tuple.getT1(), tuple.getT2()))
.blockLast();
我只是在最后阻止,因为没有什么可做的。一旦下游处理完所有内容,我的程序就可以退出了。
我正在使用 Reactor 3.4.17。
是的,zip
运算符可以工作。这是另一种方法:
Flux.fromIterable(this.events)
.map(
event -> this.service.apply(event.getData().getCustomerId())
.flatMap(customer -> this.downstream.accept(customer, event.getData()))
);
您可以在将事件作为参数的 lambda 表达式中获取客户。这样,您就可以引用事件和客户这两个参数。
在上面的示例中,我假设 this.downstream.accept
returns 一个 Flux
(如果需要,它可以是一个空的 Flux
),这就是为什么我m 使用 flatMap
运算符。
我是反应式编程和 Project Reactor 的新手,如果我用错了词,请提前致歉。
我有一个简单的过程性程序,我正在将其转换为反应式程序,更像是一种学习 activity,而不是其他任何东西。它不是网络应用程序,所以我只是使用 Project Reactor 而不是 Spring WebFlux。在旧应用程序中,逻辑是这样的:
An event is received. Take the customer ID from that event, get the full details of that customer, then pass both the event and the customer data to something downstream.
final OrderEvent event = mapper.readValue(json, OrderEvent.class);
final OrderData data = event.getData();
// Retrieve customer
final Customer customer = service.getCustomerWithId(data.getCustomerId());
// Log a summary
LOG.info(
"Received an event indicating that {} ordered {} products", customer.getName(), data.getOrderCount());
downstream.doSomething(data, customer);
忽略最后一条语句,这似乎很容易用 Reactor 实现。传入的 JSON 可以包装在 Mono<String>
中,一系列 map
(和 flatMap
?)语句可以将其转换为 Mono<MyData>
。我将服务更改为 return a Mono<Customer>
但我正在努力解决如何将服务的输出传递给下游对象的问题。如果它只需要客户数据,那么我会让它成为订阅者。不过,我被扔了,因为它需要原始数据 和 客户。如果我只是将所有内容链接在一起,当它转换为客户时我会丢失数据。
我应该如何处理这个问题?我考虑过使用 ContextView
但这似乎是为了将数据从 Subscriber
向上推,而不是相反。
编辑: 我试过了 Flux.zip
。这可能适用于此处描述的用例,但这是最好的方法吗?我担心在其他用例中,例如如果输入是客户 ID 并且 Web 服务 return 将该客户的所有订单编辑为 Flux<OrderData>
,那么被压缩的两个流将在每个流中包含不同数量的元素,而我不是确定它会起作用。
// Convert incoming events (a List) into a stream of order data
final Flux<OrderData> orderData = Flux.fromIterable(this.events)
.map(event -> event.getData());
// Get customers associated with order data. Each order should have exactly 1 customer.
final Flux<Customer> customers = orderData
.flatMap(data -> this.service.apply(data.getCustomerId()));
// Do something
Flux.zip(customers, orderData)
.doOnNext(tuple -> this.downstream.accept(tuple.getT1(), tuple.getT2()))
.blockLast();
我只是在最后阻止,因为没有什么可做的。一旦下游处理完所有内容,我的程序就可以退出了。
我正在使用 Reactor 3.4.17。
是的,zip
运算符可以工作。这是另一种方法:
Flux.fromIterable(this.events)
.map(
event -> this.service.apply(event.getData().getCustomerId())
.flatMap(customer -> this.downstream.accept(customer, event.getData()))
);
您可以在将事件作为参数的 lambda 表达式中获取客户。这样,您就可以引用事件和客户这两个参数。
在上面的示例中,我假设 this.downstream.accept
returns 一个 Flux
(如果需要,它可以是一个空的 Flux
),这就是为什么我m 使用 flatMap
运算符。