ExecutorService 线程在 RxJava 代码中没有按预期工作
ExecutorService threads not working as expected in RxJava code
我在 RxJava 代码中使用 java.util.concurrent.ExecutorService 作为调度程序线程池:
public class Test {
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Scheduler-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern).build();
}
@Test
public void testSubscribeOn() {
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(Utils::log)
.map(x -> x+1)
.doOnNext(Utils::log)
.subscribeOn(schedulerA)
.map(x -> x+2)
.doOnNext(Utils::log)
.subscribe(
x -> log("Got " + x),
Throwable::printStackTrace,
() -> log("Completed"));
log("exiting");
}
}
public class Utils {
private static long start = System.currentTimeMillis();
public static void log(Object label) {
System.out.println(
System.currentTimeMillis() - start + "\t| " +
Thread.currentThread().getName() + "\t| " +
label);
}
}
我观察到主线程在调度程序线程开始工作之前退出,因此没有显示调度程序线程的输出。据我所知,ExecutorService 线程不是守护线程,那么为什么会出现这种行为?
设置
Linux ThinkPad-P50 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
openjdk version "14.0.2" 2020-07-14
OpenJDK Runtime Environment AdoptOpenJDK (build 14.0.2+12)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14.0.2+12, mixed mode, sharing)
我认为这与 JUnit
-Runner 有关。让我们看看下面的例子:
public class WhileTrueJava {
public static void main(String[] args) {
Thread thread =
new Thread(
() -> {
while (true) {}
});
thread.setDaemon(false);
thread.start();
log("exit");
}
private static void log(Object msg) {
System.out.println(Thread.currentThread().getName() + "-" + msg);
}
}
当我运行给出例子时,进程不会退出。当我使用相同的 JUnit
时,该过程将完成。我还不能说,为什么会这样。
示例:
class So65650913 {
static ExecutorService poolA = newFixedThreadPool(10, threadFactory("Scheduler-A-%d"));
static Scheduler schedulerA = Schedulers.from(poolA);
public static void main(String[] args) {
Utils.log("Starting");
Observable<String> obs =
Observable.fromCallable(
() -> {
Utils.log("fromCallable lambda called");
Thread.sleep(1_000);
Utils.log("fromCallable return value");
return "42";
});
Utils.log("Created");
Disposable completed =
obs.doOnNext(Utils::log)
.map(x -> x + 1)
.doOnNext(Utils::log)
.subscribeOn(schedulerA)
.map(x -> x + 2)
.doOnNext(Utils::log)
.subscribe(
x -> Utils.log("Got " + x),
Throwable::printStackTrace,
() -> Utils.log("Completed"));
Utils.log("exiting");
}
private static ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder().setDaemon(false).setNameFormat(pattern).build();
}
}
class Utils {
private static final long start = System.nanoTime();
private Utils() {}
public static void log(Object label) {
System.out.println(
(System.nanoTime()
- start) / 1_000_000
+ "\t| "
+ Thread.currentThread().getName()
+ "\t| "
+ label);
}
}
输出
0 | main | Starting
62 | main | Created
77 | main | exiting
78 | Scheduler-A-0 | fromCallable lambda called
1078 | Scheduler-A-0 | fromCallable return value
1082 | Scheduler-A-0 | 42
1082 | Scheduler-A-0 | 421
1082 | Scheduler-A-0 | 4212
1082 | Scheduler-A-0 | Got 4212
1082 | Scheduler-A-0 | Completed
看起来进程不会停止,直到线程池被关闭并且所有 运行ning 任务都被执行。
只需在 main
方法的末尾添加 poolA.shutdown();
,看看会发生什么。我会说,行为符合预期。
更新
JUnit
行为不同,因为下面的代码^1
public static void main(String... args) {
int exitCode = execute(System.out, System.err, args).getExitCode();
System.exit(exitCode);
}
看起来,当主线程失败时,进程将被杀死 System.exit
。因此,这是 JUnit
ConsoleLauncher
的错。
我在 RxJava 代码中使用 java.util.concurrent.ExecutorService 作为调度程序线程池:
public class Test {
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Scheduler-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern).build();
}
@Test
public void testSubscribeOn() {
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(Utils::log)
.map(x -> x+1)
.doOnNext(Utils::log)
.subscribeOn(schedulerA)
.map(x -> x+2)
.doOnNext(Utils::log)
.subscribe(
x -> log("Got " + x),
Throwable::printStackTrace,
() -> log("Completed"));
log("exiting");
}
}
public class Utils {
private static long start = System.currentTimeMillis();
public static void log(Object label) {
System.out.println(
System.currentTimeMillis() - start + "\t| " +
Thread.currentThread().getName() + "\t| " +
label);
}
}
我观察到主线程在调度程序线程开始工作之前退出,因此没有显示调度程序线程的输出。据我所知,ExecutorService 线程不是守护线程,那么为什么会出现这种行为?
设置
Linux ThinkPad-P50 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
openjdk version "14.0.2" 2020-07-14
OpenJDK Runtime Environment AdoptOpenJDK (build 14.0.2+12)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14.0.2+12, mixed mode, sharing)
我认为这与 JUnit
-Runner 有关。让我们看看下面的例子:
public class WhileTrueJava {
public static void main(String[] args) {
Thread thread =
new Thread(
() -> {
while (true) {}
});
thread.setDaemon(false);
thread.start();
log("exit");
}
private static void log(Object msg) {
System.out.println(Thread.currentThread().getName() + "-" + msg);
}
}
当我运行给出例子时,进程不会退出。当我使用相同的 JUnit
时,该过程将完成。我还不能说,为什么会这样。
示例:
class So65650913 {
static ExecutorService poolA = newFixedThreadPool(10, threadFactory("Scheduler-A-%d"));
static Scheduler schedulerA = Schedulers.from(poolA);
public static void main(String[] args) {
Utils.log("Starting");
Observable<String> obs =
Observable.fromCallable(
() -> {
Utils.log("fromCallable lambda called");
Thread.sleep(1_000);
Utils.log("fromCallable return value");
return "42";
});
Utils.log("Created");
Disposable completed =
obs.doOnNext(Utils::log)
.map(x -> x + 1)
.doOnNext(Utils::log)
.subscribeOn(schedulerA)
.map(x -> x + 2)
.doOnNext(Utils::log)
.subscribe(
x -> Utils.log("Got " + x),
Throwable::printStackTrace,
() -> Utils.log("Completed"));
Utils.log("exiting");
}
private static ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder().setDaemon(false).setNameFormat(pattern).build();
}
}
class Utils {
private static final long start = System.nanoTime();
private Utils() {}
public static void log(Object label) {
System.out.println(
(System.nanoTime()
- start) / 1_000_000
+ "\t| "
+ Thread.currentThread().getName()
+ "\t| "
+ label);
}
}
输出
0 | main | Starting
62 | main | Created
77 | main | exiting
78 | Scheduler-A-0 | fromCallable lambda called
1078 | Scheduler-A-0 | fromCallable return value
1082 | Scheduler-A-0 | 42
1082 | Scheduler-A-0 | 421
1082 | Scheduler-A-0 | 4212
1082 | Scheduler-A-0 | Got 4212
1082 | Scheduler-A-0 | Completed
看起来进程不会停止,直到线程池被关闭并且所有 运行ning 任务都被执行。
只需在 main
方法的末尾添加 poolA.shutdown();
,看看会发生什么。我会说,行为符合预期。
更新
JUnit
行为不同,因为下面的代码^1
public static void main(String... args) {
int exitCode = execute(System.out, System.err, args).getExitCode();
System.exit(exitCode);
}
看起来,当主线程失败时,进程将被杀死 System.exit
。因此,这是 JUnit
ConsoleLauncher
的错。