为什么这个通量不在多线程中执行?
Why this flux isn't executed in multiple threads?
我有以下例子:
Flux.just(1,2,3,4,5,6,7,8)
.flatMap(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return Mono.just(integer);
}, 5)
.repeat()
.subscribeOn(Schedulers.parallel())
.subscribe();
日志如下:
val:4, thread:14
val:5, thread:14
val:6, thread:14
val:7, thread:14
val:8, thread:14
val:1, thread:14
val:2, thread:14
val:3, thread:14
为什么到处都是同一个线程??我如何修改示例,使其在多个线程中执行?
您需要使用如下的parallel
操作:
Flux.just(1,2,3,4,5,6,7,8)
.parallel(2) // mention number of threads
.runOn(Schedulers.parallel())
.map(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
})
.subscribe();
如果你想让每个重复的通量都在不同的线程上,你可以把publishOn
移到前面,像这样:
Flux.just(1,2,3,4,5,6,7,8)
.publishOn(Schedulers.parallel()) // <- before
.flatMap(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return Mono.just(integer);
}, 5)
.repeat()
.subscribe();
现在的输出是这样的:
val:1, thread:20
val:2, thread:20
val:3, thread:20
val:4, thread:20
val:5, thread:20
val:6, thread:20
val:7, thread:20
val:8, thread:20
val:1, thread:13
val:2, thread:13
val:3, thread:13
val:4, thread:13
val:5, thread:13
val:6, thread:13
val:7, thread:13
val:8, thread:13
如果你想让每个整数在不同的线程中,你可以这样做:
Flux.just(1,2,3,4,5,6,7,8)
.publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
.flatMap(integer -> {
return Mono.fromCallable(() -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
}).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
})
.repeat()
.subscribe();
输出变为:
val:3, thread:16
val:2, thread:15
val:7, thread:20
val:8, thread:13
val:5, thread:18
val:6, thread:19
val:3, thread:17
val:5, thread:19
val:6, thread:20
val:1, thread:15
val:8, thread:14
val:4, thread:18
val:7, thread:13
我有以下例子:
Flux.just(1,2,3,4,5,6,7,8)
.flatMap(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return Mono.just(integer);
}, 5)
.repeat()
.subscribeOn(Schedulers.parallel())
.subscribe();
日志如下:
val:4, thread:14
val:5, thread:14
val:6, thread:14
val:7, thread:14
val:8, thread:14
val:1, thread:14
val:2, thread:14
val:3, thread:14
为什么到处都是同一个线程??我如何修改示例,使其在多个线程中执行?
您需要使用如下的parallel
操作:
Flux.just(1,2,3,4,5,6,7,8)
.parallel(2) // mention number of threads
.runOn(Schedulers.parallel())
.map(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
})
.subscribe();
如果你想让每个重复的通量都在不同的线程上,你可以把publishOn
移到前面,像这样:
Flux.just(1,2,3,4,5,6,7,8)
.publishOn(Schedulers.parallel()) // <- before
.flatMap(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return Mono.just(integer);
}, 5)
.repeat()
.subscribe();
现在的输出是这样的:
val:1, thread:20
val:2, thread:20
val:3, thread:20
val:4, thread:20
val:5, thread:20
val:6, thread:20
val:7, thread:20
val:8, thread:20
val:1, thread:13
val:2, thread:13
val:3, thread:13
val:4, thread:13
val:5, thread:13
val:6, thread:13
val:7, thread:13
val:8, thread:13
如果你想让每个整数在不同的线程中,你可以这样做:
Flux.just(1,2,3,4,5,6,7,8)
.publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
.flatMap(integer -> {
return Mono.fromCallable(() -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
}).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
})
.repeat()
.subscribe();
输出变为:
val:3, thread:16
val:2, thread:15
val:7, thread:20
val:8, thread:13
val:5, thread:18
val:6, thread:19
val:3, thread:17
val:5, thread:19
val:6, thread:20
val:1, thread:15
val:8, thread:14
val:4, thread:18
val:7, thread:13