反应式 Flux 编程中的并行执行
Parallel execution in reactive Flux programming
我正在尝试避免使用阻塞线程模型并使用反应模型来实现高吞吐量。
我的用例是这样的:有大量传入消息。对于每条消息,我都需要做一些 I/O 而不会阻塞该线程。在这里,我在单独的线程中处理每条消息。如果应用程序被终止,我需要完成正在进行的任务并优雅地关闭。我在下面使用 Thread.sleep 来模拟密集的 I/O 操作。
代码示例如下:
public class TestReactor {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
Disposable task = Flux.range(1, 100).parallel().runOn(Schedulers.fromExecutor(executor)).doOnNext(message -> {
System.out.println(Thread.currentThread().getName()+": processing " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+": Done");
})
.sequential()
.doOnError(e->e.printStackTrace())
.doOnCancel(()->{
System.out.println("disposing.....");
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
})
.subscribe();
Thread.sleep(4000);
task.dispose();
System.out.println("Disposed. Waiting for sometime before exit.");
Thread.sleep(20000);
}
}
当我 运行 这样做时,通量似乎忽略了 executor.shutdown() 并因中断异常而出错。是否可以使用 flux 实现我的用例?
添加输出:
你犯了一个大错误:在使用响应式编程时,不要使用任何线程操作。这是一个肮脏的黑客。当您尝试在 FRP 上编写代码时,糟糕设计和代码异味的最明显标记是:
- 在运算符和副作用函数 (doOn) 中调用
try-catch
块。如果您有例外情况 - 好的,没关系。调用 onError
运算符并在其中处理管道的行为。
- 在 Mono/Flux 中使用
Thread.sleep
和其他并发 API。 Weblfux API 也有很好的 API 来处理并发和多线程。
因此,关于问题及其解决方案:您的设计很糟糕 :) 您不需要处理任何一次性用品或线程中断。你只需要删除它并添加 killswitch。
所以,工作代码,它可以满足您的需求:
public class TestReactor {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
Flux.generate(() -> 0,
(state, sink) -> {
sink.next("current state = " + state);
if (state == 100) sink.complete();
return state + 1;
}
)
.parallel()
.runOn(Schedulers.fromExecutor(executor))
.doOnNext(message -> {
System.out.println(Thread.currentThread().getName() + ": processing " + message);
System.out.println(Thread.currentThread().getName() + ": Done");
})
.sequential()
.doOnError(e -> e.printStackTrace())
.doOnCancel(() -> {
System.out.println("disposing.....");
})
.subscribe();
}
}
我正在尝试避免使用阻塞线程模型并使用反应模型来实现高吞吐量。 我的用例是这样的:有大量传入消息。对于每条消息,我都需要做一些 I/O 而不会阻塞该线程。在这里,我在单独的线程中处理每条消息。如果应用程序被终止,我需要完成正在进行的任务并优雅地关闭。我在下面使用 Thread.sleep 来模拟密集的 I/O 操作。 代码示例如下:
public class TestReactor {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
Disposable task = Flux.range(1, 100).parallel().runOn(Schedulers.fromExecutor(executor)).doOnNext(message -> {
System.out.println(Thread.currentThread().getName()+": processing " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+": Done");
})
.sequential()
.doOnError(e->e.printStackTrace())
.doOnCancel(()->{
System.out.println("disposing.....");
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
})
.subscribe();
Thread.sleep(4000);
task.dispose();
System.out.println("Disposed. Waiting for sometime before exit.");
Thread.sleep(20000);
}
}
当我 运行 这样做时,通量似乎忽略了 executor.shutdown() 并因中断异常而出错。是否可以使用 flux 实现我的用例?
添加输出:
你犯了一个大错误:在使用响应式编程时,不要使用任何线程操作。这是一个肮脏的黑客。当您尝试在 FRP 上编写代码时,糟糕设计和代码异味的最明显标记是:
- 在运算符和副作用函数 (doOn) 中调用
try-catch
块。如果您有例外情况 - 好的,没关系。调用onError
运算符并在其中处理管道的行为。 - 在 Mono/Flux 中使用
Thread.sleep
和其他并发 API。 Weblfux API 也有很好的 API 来处理并发和多线程。
因此,关于问题及其解决方案:您的设计很糟糕 :) 您不需要处理任何一次性用品或线程中断。你只需要删除它并添加 killswitch。
所以,工作代码,它可以满足您的需求:
public class TestReactor {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
Flux.generate(() -> 0,
(state, sink) -> {
sink.next("current state = " + state);
if (state == 100) sink.complete();
return state + 1;
}
)
.parallel()
.runOn(Schedulers.fromExecutor(executor))
.doOnNext(message -> {
System.out.println(Thread.currentThread().getName() + ": processing " + message);
System.out.println(Thread.currentThread().getName() + ": Done");
})
.sequential()
.doOnError(e -> e.printStackTrace())
.doOnCancel(() -> {
System.out.println("disposing.....");
})
.subscribe();
}
}