Reactor GroupedFlux - 等待完成
Reactor GroupedFlux - wait to complete
有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流处理完成?
当然,无需添加未知持续时间的睡眠...
@Test
public void groupByPublishOn() throws InterruptedException {
UnicastProcessor<Integer> processor = UnicastProcessor.create();
List<Integer> results = new ArrayList<>();
Flux<Flux<Integer>> groupPublisher = processor.publish(1)
.autoConnect()
.groupBy(i -> i % 2)
.map(group -> group.publishOn(Schedulers.parallel()));
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
input.forEach(processor::onNext);
processor.onComplete();
Thread.sleep(500);
Assert.assertTrue(results.size() == input.size());
}
您可以替换这些行:
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
有了这个
groupPublisher.log()
.flatMap(g -> g.log()
.doOnNext(results::add)
)
.blockLast();
flatMap
是一种比订阅内订阅更好的模式,它将为您负责订阅群组。
doOnNext
处理消耗性副作用(向集合添加值),使您无需在订阅中执行该操作。
blockLast()
替换订阅,而不是让您为它阻止的事件提供处理程序直到完成(和 returns 最后发出的项目,但您已经处理好了在 doOnNext 内)。
使用 blockLast() 的主要问题是,如果您的操作无法完成,您将永远不会释放管道。
你需要做的是获取 Disposable 并检查是否已完成管道,这意味着布尔值 isDisposed 它将 return 是的。
然后由您决定是否要超时,例如延迟计数实现:)
整数计数 = 0;
@Test
public void checkIfItDisposable() throws InterruptedException {
Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
}).subscribeOn(Schedulers.newElastic("1"))
.subscribe();
while (!subscribe.isDisposed() && count < 100) {
Thread.sleep(400);
count++;
System.out.println("Waiting......");
}
System.out.println("It disposable:" + subscribe.isDisposed());
如果你想使用blockLast,至少要添加一个超时
@Test
public void checkIfItDisposableBlocking() throws InterruptedException {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
}).subscribeOn(Schedulers.newElastic("1"))
.blockLast(Duration.of(60, ChronoUnit.SECONDS));
System.out.println("It disposable");
}
如果您需要更多ides,您可以在此处查看更多Reactor示例https://github.com/politrons/reactive
有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流处理完成?
当然,无需添加未知持续时间的睡眠...
@Test
public void groupByPublishOn() throws InterruptedException {
UnicastProcessor<Integer> processor = UnicastProcessor.create();
List<Integer> results = new ArrayList<>();
Flux<Flux<Integer>> groupPublisher = processor.publish(1)
.autoConnect()
.groupBy(i -> i % 2)
.map(group -> group.publishOn(Schedulers.parallel()));
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
input.forEach(processor::onNext);
processor.onComplete();
Thread.sleep(500);
Assert.assertTrue(results.size() == input.size());
}
您可以替换这些行:
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
有了这个
groupPublisher.log()
.flatMap(g -> g.log()
.doOnNext(results::add)
)
.blockLast();
flatMap
是一种比订阅内订阅更好的模式,它将为您负责订阅群组。
doOnNext
处理消耗性副作用(向集合添加值),使您无需在订阅中执行该操作。
blockLast()
替换订阅,而不是让您为它阻止的事件提供处理程序直到完成(和 returns 最后发出的项目,但您已经处理好了在 doOnNext 内)。
使用 blockLast() 的主要问题是,如果您的操作无法完成,您将永远不会释放管道。
你需要做的是获取 Disposable 并检查是否已完成管道,这意味着布尔值 isDisposed 它将 return 是的。
然后由您决定是否要超时,例如延迟计数实现:)
整数计数 = 0;
@Test
public void checkIfItDisposable() throws InterruptedException {
Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
}).subscribeOn(Schedulers.newElastic("1"))
.subscribe();
while (!subscribe.isDisposed() && count < 100) {
Thread.sleep(400);
count++;
System.out.println("Waiting......");
}
System.out.println("It disposable:" + subscribe.isDisposed());
如果你想使用blockLast,至少要添加一个超时
@Test
public void checkIfItDisposableBlocking() throws InterruptedException {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
}).subscribeOn(Schedulers.newElastic("1"))
.blockLast(Duration.of(60, ChronoUnit.SECONDS));
System.out.println("It disposable");
}
如果您需要更多ides,您可以在此处查看更多Reactor示例https://github.com/politrons/reactive