如何将 Mono 中的列表属性作为通量进行操作?
How to manipulate a list attribute inside Mono as a flux?
我是反应堆项目的新手,我正在尝试在 Mono
.
中操作 list
中的某些字段,如 Flux
所以我 Mono<Basket>
有一个名为 lines
的属性,它是 List<Line>
。
然后,对于每一行,我都必须调用两个外部服务来获取一些额外的信息。
这是我的代码:
Mono<Basket> basketMono = //this doesn't work cause I map it to a Flux
Mono.just(basket)
.flatMapIterable(Basket::getLines)
.parallel(...)
.runOn(...)
.map((line) -> {
line.setInfo1(externalService1.getInfo());
line.setInfo2(externalService2.getInfo());
return line;
});
我的主要问题是我不知道如何将此附加信息设置到行并保留原始对象,以便保存此代码的方法可以 return Mono<Basket>
已设置所有附加信息。
我正在为此苦苦挣扎。这种做法对吗?一些帮助将不胜感激。
一个简单的解决方案是不要将线平面化,因为它会创建一个包含 n 个元素(和类型线)的新发布者。在 flatmap 中,您可以启动另一个发布者,它转到服务、设置数据,然后您可以 return 原始对象。
Mono<Basket> basketMono = Mono.just(basket)
.flatMap(b ->
Flux.fromIterable(b.items)
.flatMap(this::callService1)
.flatMap(this::callService2)
.then(Mono.just(b))
);
我想你的外部服务调用是被动的,像这样:
Mono<Item> callService1(Item item) {
return mockService1().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
Mono<String> mockService1() {
return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
}
注意flatMap会自动订阅内部发布者。
我也做了一个简单的例子,你可以测试一下:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class Example {
class Item {
String info1;
String info2;
public void setInfo1(String info1) {
this.info1 = info1;
}
public void setInfo2(String info2) {
this.info2 = info2;
}
public String getInfo1() {
return info1;
}
public String getInfo2() {
return info2;
}
}
class Basket {
String user;
List<Item> items;
public Basket(String user, List<Item> items) {
this.user = user;
this.items = items;
}
}
Mono<String> mockService1() {
return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
}
Mono<String> mockService2() {
return Mono.just("some other data " + ThreadLocalRandom.current().nextInt(1000)).delayElement(Duration.ofMillis(100));
}
Mono<Item> callService1(Item item) {
return mockService1().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
Mono<Item> callService2(Item item) {
return mockService2().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
@Test
public void testBasket() {
var basket = new Basket("first", List.of(new Item(), new Item(), new Item()));
Mono<Basket> basketMono = Mono.just(basket)
.flatMap(b ->
Flux.fromIterable(b.items)
.flatMap(this::callService1)
.flatMap(this::callService2)
.then(Mono.just(b))
);
StepVerifier.create(basketMono)
.expectNextMatches(b -> b.items.get(0).info1 != null)
.verifyComplete();
}
}
我是反应堆项目的新手,我正在尝试在 Mono
.
list
中的某些字段,如 Flux
所以我 Mono<Basket>
有一个名为 lines
的属性,它是 List<Line>
。
然后,对于每一行,我都必须调用两个外部服务来获取一些额外的信息。
这是我的代码:
Mono<Basket> basketMono = //this doesn't work cause I map it to a Flux
Mono.just(basket)
.flatMapIterable(Basket::getLines)
.parallel(...)
.runOn(...)
.map((line) -> {
line.setInfo1(externalService1.getInfo());
line.setInfo2(externalService2.getInfo());
return line;
});
我的主要问题是我不知道如何将此附加信息设置到行并保留原始对象,以便保存此代码的方法可以 return Mono<Basket>
已设置所有附加信息。
我正在为此苦苦挣扎。这种做法对吗?一些帮助将不胜感激。
一个简单的解决方案是不要将线平面化,因为它会创建一个包含 n 个元素(和类型线)的新发布者。在 flatmap 中,您可以启动另一个发布者,它转到服务、设置数据,然后您可以 return 原始对象。
Mono<Basket> basketMono = Mono.just(basket)
.flatMap(b ->
Flux.fromIterable(b.items)
.flatMap(this::callService1)
.flatMap(this::callService2)
.then(Mono.just(b))
);
我想你的外部服务调用是被动的,像这样:
Mono<Item> callService1(Item item) {
return mockService1().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
Mono<String> mockService1() {
return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
}
注意flatMap会自动订阅内部发布者。
我也做了一个简单的例子,你可以测试一下:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class Example {
class Item {
String info1;
String info2;
public void setInfo1(String info1) {
this.info1 = info1;
}
public void setInfo2(String info2) {
this.info2 = info2;
}
public String getInfo1() {
return info1;
}
public String getInfo2() {
return info2;
}
}
class Basket {
String user;
List<Item> items;
public Basket(String user, List<Item> items) {
this.user = user;
this.items = items;
}
}
Mono<String> mockService1() {
return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
}
Mono<String> mockService2() {
return Mono.just("some other data " + ThreadLocalRandom.current().nextInt(1000)).delayElement(Duration.ofMillis(100));
}
Mono<Item> callService1(Item item) {
return mockService1().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
Mono<Item> callService2(Item item) {
return mockService2().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
@Test
public void testBasket() {
var basket = new Basket("first", List.of(new Item(), new Item(), new Item()));
Mono<Basket> basketMono = Mono.just(basket)
.flatMap(b ->
Flux.fromIterable(b.items)
.flatMap(this::callService1)
.flatMap(this::callService2)
.then(Mono.just(b))
);
StepVerifier.create(basketMono)
.expectNextMatches(b -> b.items.get(0).info1 != null)
.verifyComplete();
}
}