将 Mono 与 Flux 相结合
Combine Mono with Flux
我想创建一个结合两个反应源结果的服务。
一个正在生产 Mono,另一个正在生产 Flux。对于合并,我需要为每个发射的通量使用相同的单声道值。
现在我有这样的东西
Flux.zip(
service1.getConfig(), //produces flux
service2.getContext() //produces mono
.cache().repeat()
)
这给了我我需要的,
- service2 只被调用一次
- 为每个配置提供上下文
- 生成的通量具有与配置一样多的元素
但我注意到 repeat() 在缓存上下文后发出大量元素。这是个问题吗?
我可以做些什么来将重复次数限制为接收到的配置数,但仍然同时进行两个请求吗?
或者这不是问题,我可以放心地忽略那些额外发出的元素吗?
我尝试使用 combineLatest
,但根据时间安排,我的一些配置元素可能会丢失并且无法处理。
编辑
参考@Ricard Kollcaku 的建议,我创建了示例测试来说明为什么这不是我想要的。
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class SampleTest
{
Logger LOG = LoggerFactory.getLogger(SampleTest.class);
AtomicLong counter = new AtomicLong(0);
Flux<String> getFlux()
{
return Flux.fromStream(() -> {
LOG.info("flux started");
sleep(1000);
return Stream.of("a", "b", "c");
}).subscribeOn(Schedulers.parallel());
}
Mono<String> getMono()
{
return Mono.defer(() -> {
counter.incrementAndGet();
LOG.info("mono started");
sleep(1000);
return Mono.just("mono");
}).subscribeOn(Schedulers.parallel());
}
private void sleep(final long milis)
{
try
{
Thread.sleep(milis);
}
catch (final InterruptedException e)
{
e.printStackTrace();
}
}
@Test
void test0()
{
final Flux<String> result = Flux.zip(
getFlux(),
getMono().cache().repeat()
.doOnNext(n -> LOG.warn("signal on mono", n)),
(s1, s2) -> s1 + " " + s2
);
assertResults(result);
}
@Test
void test1()
{
final Flux<String> result =
getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
(s1, s2) -> s1 + " " + s2));
assertResults(result);
}
@Test
void test2()
{
final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
assertResults(result);
}
void assertResults(final Flux<String> result)
{
final Flux<String> flux = result;
StepVerifier.create(flux)
.expectNext("a mono")
.expectNext("b mono")
.expectNext("c mono")
.verifyComplete();
Assertions.assertEquals(1L, counter.get());
}
查看test1和test2的测试结果
2020-01-20 12:55:22.542 INFO [] [] [ parallel-3] SampleTest : flux started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-4] SampleTest : mono started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-5] SampleTest : mono started
2020-01-20 12:55:24.548 INFO [] [] [ parallel-6] SampleTest : mono started
expected: <1> but was: <3>
我需要拒绝你的提议。在这两种情况下 getMono 是
- 调用次数与不断变化的项目一样多
- 在第一个通量元素到达后调用
这些是我想避免的互动。我的服务正在后台发出 http 请求,它们可能很耗时。
我目前的解决方案没有这个问题,但如果我将记录器添加到我的 zip 我会得到这个
2020-01-20 12:55:20.505 INFO [] [] [ parallel-1] SampleTest : flux started
2020-01-20 12:55:20.508 INFO [] [] [ parallel-2] SampleTest : mono started
2020-01-20 12:55:21.523 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.528 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.535 WARN [] [] [ parallel-2] SampleTest : signal on mono
如您所见,将 cache().repeat()
组合在一起会发出很多元素,我想知道这是否是一个问题,如果是,那么如何避免它(但保持单一调用 mono 和并行调用)。
像 Project Reactor 和 RxJava 这样的库试图提供尽可能多的功能组合,但不提供对组合功能工具的访问。因此,总会有一些极端情况没有被涵盖。
据我所知,我自己的 DF4J 是唯一提供组合功能方法的异步库。例如,这是用户压缩 Flux 和 Mono 的方式:(当然,这个 class 不是 DF4J 本身的一部分):
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
abstract class ZipActor<T1, T2> extends Actor {
InpFlow<T1> inpFlow = new InpFlow<>(this);
InpFlow<T2> inpScalar = new InpFlow<>(this);
ZipActor(Flux<T1> flux, Mono<T2> mono) {
flux.subscribe(inpFlow);
mono.subscribe(inpScalar);
}
@Override
protected void runAction() throws Throwable {
if (inpFlow.isCompleted()) {
stop();
return;
}
T1 element1 = inpFlow.removeAndRequest();
T2 element2 = inpScalar.current();
runAction(element1, element2);
}
protected abstract void runAction(T1 element1, T2 element2);
}
这是它的用法:
@Test
public void ZipActorTest() {
Flux<Integer> flux = Flux.just(1,2,3);
Mono<Integer> mono = Mono.just(5);
ZipActor<Integer, Integer> actor = new ZipActor<Integer, Integer>(flux, mono){
@Override
protected void runAction(Integer element1, Integer element2) {
System.out.println("got:"+element1+" and:"+element2);
}
};
actor.start();
actor.join();
}
控制台输出如下:
got:1 and:5
got:2 and:5
got:3 and:5
只需简单的更改就可以做到
getFlux()
.flatMap(s -> Mono.zip(Mono.just(s),getMono(), (s1, s2) -> s1+" "+s2))
.subscribe(System.out::println);
Flux<String> getFlux(){
return Flux.just("a","b","c");
}
Mono<String> getMono(){
return Mono.just("mono");
}
如果你想使用 zip 但你可以使用 flatmap 获得相同的结果
getFlux()
.flatMap(s -> getMono()
.map((s1 -> s + " " + s1)))
.subscribe(System.out::println);
}
Flux<String> getFlux() {
return Flux.just("a", "b", "c");
}
Mono<String> getMono() {
return Mono.just("mono");
}
两个结果都是:
单声道
b 单声道
单声道
编辑
好的,现在我理解得更好了。你能试试这个解决方案吗?
getMono().
flatMapMany(s -> getFlux().map(s1 -> s1 + " " + s))
.subscribe(System.out::println);
Flux<String> getFlux() {
return Flux.defer(() -> {
System.out.println("init flux");
return Flux.just("a", "b", "c");
});
}
Mono<String> getMono() {
return Mono.defer(() -> {
System.out.println("init Mono");
return Mono.just("sss");
});
}
我认为您可以通过 Flux.join
来实现您想要实现的目标
下面是一些示例代码:
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();
List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
return x + y;
}).collectList().block();
System.out.println(list);
我想创建一个结合两个反应源结果的服务。 一个正在生产 Mono,另一个正在生产 Flux。对于合并,我需要为每个发射的通量使用相同的单声道值。
现在我有这样的东西
Flux.zip(
service1.getConfig(), //produces flux
service2.getContext() //produces mono
.cache().repeat()
)
这给了我我需要的,
- service2 只被调用一次
- 为每个配置提供上下文
- 生成的通量具有与配置一样多的元素
但我注意到 repeat() 在缓存上下文后发出大量元素。这是个问题吗?
我可以做些什么来将重复次数限制为接收到的配置数,但仍然同时进行两个请求吗? 或者这不是问题,我可以放心地忽略那些额外发出的元素吗?
我尝试使用 combineLatest
,但根据时间安排,我的一些配置元素可能会丢失并且无法处理。
编辑
参考@Ricard Kollcaku 的建议,我创建了示例测试来说明为什么这不是我想要的。
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class SampleTest
{
Logger LOG = LoggerFactory.getLogger(SampleTest.class);
AtomicLong counter = new AtomicLong(0);
Flux<String> getFlux()
{
return Flux.fromStream(() -> {
LOG.info("flux started");
sleep(1000);
return Stream.of("a", "b", "c");
}).subscribeOn(Schedulers.parallel());
}
Mono<String> getMono()
{
return Mono.defer(() -> {
counter.incrementAndGet();
LOG.info("mono started");
sleep(1000);
return Mono.just("mono");
}).subscribeOn(Schedulers.parallel());
}
private void sleep(final long milis)
{
try
{
Thread.sleep(milis);
}
catch (final InterruptedException e)
{
e.printStackTrace();
}
}
@Test
void test0()
{
final Flux<String> result = Flux.zip(
getFlux(),
getMono().cache().repeat()
.doOnNext(n -> LOG.warn("signal on mono", n)),
(s1, s2) -> s1 + " " + s2
);
assertResults(result);
}
@Test
void test1()
{
final Flux<String> result =
getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
(s1, s2) -> s1 + " " + s2));
assertResults(result);
}
@Test
void test2()
{
final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
assertResults(result);
}
void assertResults(final Flux<String> result)
{
final Flux<String> flux = result;
StepVerifier.create(flux)
.expectNext("a mono")
.expectNext("b mono")
.expectNext("c mono")
.verifyComplete();
Assertions.assertEquals(1L, counter.get());
}
查看test1和test2的测试结果
2020-01-20 12:55:22.542 INFO [] [] [ parallel-3] SampleTest : flux started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-4] SampleTest : mono started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-5] SampleTest : mono started
2020-01-20 12:55:24.548 INFO [] [] [ parallel-6] SampleTest : mono started
expected: <1> but was: <3>
我需要拒绝你的提议。在这两种情况下 getMono 是 - 调用次数与不断变化的项目一样多 - 在第一个通量元素到达后调用 这些是我想避免的互动。我的服务正在后台发出 http 请求,它们可能很耗时。
我目前的解决方案没有这个问题,但如果我将记录器添加到我的 zip 我会得到这个
2020-01-20 12:55:20.505 INFO [] [] [ parallel-1] SampleTest : flux started
2020-01-20 12:55:20.508 INFO [] [] [ parallel-2] SampleTest : mono started
2020-01-20 12:55:21.523 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.528 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.535 WARN [] [] [ parallel-2] SampleTest : signal on mono
如您所见,将 cache().repeat()
组合在一起会发出很多元素,我想知道这是否是一个问题,如果是,那么如何避免它(但保持单一调用 mono 和并行调用)。
像 Project Reactor 和 RxJava 这样的库试图提供尽可能多的功能组合,但不提供对组合功能工具的访问。因此,总会有一些极端情况没有被涵盖。
据我所知,我自己的 DF4J 是唯一提供组合功能方法的异步库。例如,这是用户压缩 Flux 和 Mono 的方式:(当然,这个 class 不是 DF4J 本身的一部分):
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
abstract class ZipActor<T1, T2> extends Actor {
InpFlow<T1> inpFlow = new InpFlow<>(this);
InpFlow<T2> inpScalar = new InpFlow<>(this);
ZipActor(Flux<T1> flux, Mono<T2> mono) {
flux.subscribe(inpFlow);
mono.subscribe(inpScalar);
}
@Override
protected void runAction() throws Throwable {
if (inpFlow.isCompleted()) {
stop();
return;
}
T1 element1 = inpFlow.removeAndRequest();
T2 element2 = inpScalar.current();
runAction(element1, element2);
}
protected abstract void runAction(T1 element1, T2 element2);
}
这是它的用法:
@Test
public void ZipActorTest() {
Flux<Integer> flux = Flux.just(1,2,3);
Mono<Integer> mono = Mono.just(5);
ZipActor<Integer, Integer> actor = new ZipActor<Integer, Integer>(flux, mono){
@Override
protected void runAction(Integer element1, Integer element2) {
System.out.println("got:"+element1+" and:"+element2);
}
};
actor.start();
actor.join();
}
控制台输出如下:
got:1 and:5
got:2 and:5
got:3 and:5
只需简单的更改就可以做到
getFlux()
.flatMap(s -> Mono.zip(Mono.just(s),getMono(), (s1, s2) -> s1+" "+s2))
.subscribe(System.out::println);
Flux<String> getFlux(){
return Flux.just("a","b","c");
}
Mono<String> getMono(){
return Mono.just("mono");
}
如果你想使用 zip 但你可以使用 flatmap 获得相同的结果
getFlux()
.flatMap(s -> getMono()
.map((s1 -> s + " " + s1)))
.subscribe(System.out::println);
}
Flux<String> getFlux() {
return Flux.just("a", "b", "c");
}
Mono<String> getMono() {
return Mono.just("mono");
}
两个结果都是: 单声道 b 单声道 单声道
编辑 好的,现在我理解得更好了。你能试试这个解决方案吗?
getMono().
flatMapMany(s -> getFlux().map(s1 -> s1 + " " + s))
.subscribe(System.out::println);
Flux<String> getFlux() {
return Flux.defer(() -> {
System.out.println("init flux");
return Flux.just("a", "b", "c");
});
}
Mono<String> getMono() {
return Mono.defer(() -> {
System.out.println("init Mono");
return Mono.just("sss");
});
}
我认为您可以通过 Flux.join
下面是一些示例代码:
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();
List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
return x + y;
}).collectList().block();
System.out.println(list);