Reactor中如何进行多线程文件处理
How to do multithreaded file processing in Reactor
我正在尝试使用 Reactor 的 Flux 并行处理多个文件。主要工作负载发生在对 flatMap
的调用中,之后 Flux 被转换和过滤。
每当我尝试订阅生成的 Flux 时,主线程都会在我收到任何值之前退出。
Flux.fromStream(Files.list(Paths.get("directory"))
.flatMap(path -> {
return Flux.create(sink -> {
try (
RandomAccessFile file = new RandomAccessFile(new File(path), "r");
FileChannel fileChannel = file.getChannel()
) {
// Process file into tokens
sink.next(new Token(".."));
} catch (IOException e) {
sink.error(e);
} finally {
sink.complete();
}
}).subscribeOn(Schedulers.boundedElastic());
})
.map(token -> /* Transform tokens */)
.filter(token -> /* Filter tokens*/)
.subscribe(token -> /* Store tokens in list */)
我希望在我的列表中找到处理管道的输出,但程序立即退出。首先,我想知道我是否正确使用了 Flux class,其次,我将如何等待订阅调用完成?
I'd expect to find the output of the processing pipeline in my list, but the program immediately exits.
您那里的代码在主线程上设置您的反应链,然后……在主线程上什么都不做。主线程因此完成了它的工作,并且由于 boundedElastic()
线程是守护线程,没有其他线程阻止程序退出,所以它退出。
您可以通过一个更简单的示例看到相同的行为:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);
你当然可以调用 newBoundedElastic("name", false)
使它成为一个 非守护进程 支持的调度程序,但是你必须跟踪它并在什么时候调用 dispose你已经完成了,所以它实际上只是反转了问题(程序无限运行直到你处理调度程序。)
快速 'n' 肮脏的解决方案只是阻塞 Flux
的最后一个元素作为程序的最后一行 - 所以如果我们添加:
f.blockLast();
...然后程序在退出之前等待最后一个元素被发出,我们就有了我们想要的行为。
对于简单的概念验证,这很好。然而,它在 "production" 代码中并不理想。首先,"no blocking" 是响应式代码中的一般规则,因此如果您有这样的阻塞调用,则很难确定它是否有意。如果您添加了其他链并且还希望它们完成,则必须为每个链添加阻塞调用。那很乱,而且不可持续。
更好的解决方案是使用 CountDownLatch
:
CountDownLatch cdl = new CountDownLatch(1);
Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500))
.doFinally(s -> cdl.countDown())
.subscribe(System.out::println);
cdl.await();
这样做的好处是不会显式阻塞,并且能够同时处理多个发布者(如果您将初始值设置为高于 1。)这也是我看到的通常推荐的方法这类事情 - 所以如果你想要最广泛接受的解决方案,可能就是这样。
但是,对于所有需要等待多个发布者而不是一个发布者的示例,我倾向于使用 Phaser
- 它的工作方式与 CountdownLatch 类似,但可以动态地 register()
以及deregister()
。这意味着您可以创建一个移相器,然后在需要时轻松地向其注册多个发布者,而无需更改初始值,例如:
Phaser phaser = new Phaser(1);
Flux.just(1, 2, 3, 4, 5)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
phaser.arriveAndAwaitAdvance();
(如果需要,您当然也可以将 onSubscribe
和 doFinally
逻辑包装在单独的方法中。)
我正在尝试使用 Reactor 的 Flux 并行处理多个文件。主要工作负载发生在对 flatMap
的调用中,之后 Flux 被转换和过滤。
每当我尝试订阅生成的 Flux 时,主线程都会在我收到任何值之前退出。
Flux.fromStream(Files.list(Paths.get("directory"))
.flatMap(path -> {
return Flux.create(sink -> {
try (
RandomAccessFile file = new RandomAccessFile(new File(path), "r");
FileChannel fileChannel = file.getChannel()
) {
// Process file into tokens
sink.next(new Token(".."));
} catch (IOException e) {
sink.error(e);
} finally {
sink.complete();
}
}).subscribeOn(Schedulers.boundedElastic());
})
.map(token -> /* Transform tokens */)
.filter(token -> /* Filter tokens*/)
.subscribe(token -> /* Store tokens in list */)
我希望在我的列表中找到处理管道的输出,但程序立即退出。首先,我想知道我是否正确使用了 Flux class,其次,我将如何等待订阅调用完成?
I'd expect to find the output of the processing pipeline in my list, but the program immediately exits.
您那里的代码在主线程上设置您的反应链,然后……在主线程上什么都不做。主线程因此完成了它的工作,并且由于 boundedElastic()
线程是守护线程,没有其他线程阻止程序退出,所以它退出。
您可以通过一个更简单的示例看到相同的行为:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);
你当然可以调用 newBoundedElastic("name", false)
使它成为一个 非守护进程 支持的调度程序,但是你必须跟踪它并在什么时候调用 dispose你已经完成了,所以它实际上只是反转了问题(程序无限运行直到你处理调度程序。)
快速 'n' 肮脏的解决方案只是阻塞 Flux
的最后一个元素作为程序的最后一行 - 所以如果我们添加:
f.blockLast();
...然后程序在退出之前等待最后一个元素被发出,我们就有了我们想要的行为。
对于简单的概念验证,这很好。然而,它在 "production" 代码中并不理想。首先,"no blocking" 是响应式代码中的一般规则,因此如果您有这样的阻塞调用,则很难确定它是否有意。如果您添加了其他链并且还希望它们完成,则必须为每个链添加阻塞调用。那很乱,而且不可持续。
更好的解决方案是使用 CountDownLatch
:
CountDownLatch cdl = new CountDownLatch(1);
Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500))
.doFinally(s -> cdl.countDown())
.subscribe(System.out::println);
cdl.await();
这样做的好处是不会显式阻塞,并且能够同时处理多个发布者(如果您将初始值设置为高于 1。)这也是我看到的通常推荐的方法这类事情 - 所以如果你想要最广泛接受的解决方案,可能就是这样。
但是,对于所有需要等待多个发布者而不是一个发布者的示例,我倾向于使用 Phaser
- 它的工作方式与 CountdownLatch 类似,但可以动态地 register()
以及deregister()
。这意味着您可以创建一个移相器,然后在需要时轻松地向其注册多个发布者,而无需更改初始值,例如:
Phaser phaser = new Phaser(1);
Flux.just(1, 2, 3, 4, 5)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
phaser.arriveAndAwaitAdvance();
(如果需要,您当然也可以将 onSubscribe
和 doFinally
逻辑包装在单独的方法中。)