RxJava - 为什么执行者只使用一个线程
RxJava - Why the executors only use one thread
我已经创建了一个固定的线程池来处理每 300 毫秒发出的事件并假设该过程需要 1000 毫秒。假设多线程可以工作,但只有一个线程被重用。
如果我设置的sleepTime小于300ms,处理线程会改变,但那是没有用的。
问题:我该怎么做才能使它并发?为什么程序重用线程?
提前致谢
public static void main(String[] args) throws InterruptedException {
long sleepTime = 1000;
ExecutorService e = Executors.newFixedThreadPool(3);
Observable.interval(300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long pT) {
return Observable.just(pT).subscribeOn(Schedulers.from(e));
}
})
.doOnNext(new Action1<Long>() {
@Override
public void call(Long pT) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.subscribe(new Action1<Long>() {
@Override
public void call(Long pT) {
System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());
}
});
Thread.sleep(50000);
e.shutdownNow();
}
日志
i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1
根据我对您的代码的理解,生产者的生产速度比订阅者快。然而 Observable<Long> interval(long interval, TimeUnit unit)
实际上不支持 Backpressure
。文档指出
This operator does not support backpressure as it uses time. If the
downstream needs a slower it should slow the timer or use something
like {@link #onBackpressureDrop}.
如果你的处理速度真的比生产者慢,你可以在你的订阅者代码中做这样的事情
.subscribe(new Action1<Long>() {
@Override
public void call(Long pT) {
e.submit(new Runnable() {
System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());
}
}
});
相反
.subscribeOn(Schedulers.computation())
尝试
.observeOn(Schedulers.computation())
我之前用 Rx 玩并发的这个例子作为例子非常好
public class ObservableZip {
private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async:", start, result));
}
public void showResult(String transactionType, long start, String result) {
System.out.println(result + " " +
transactionType + String.valueOf(System.currentTimeMillis() - start));
}
public Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
}
我在 GitHub 中找到了答案!
内部 observable 确实在 multi-thread 上发射,但接下来的后续行动却没有。如果我想要它并行,我应该在内部 observable 中进行。
我已经创建了一个固定的线程池来处理每 300 毫秒发出的事件并假设该过程需要 1000 毫秒。假设多线程可以工作,但只有一个线程被重用。
如果我设置的sleepTime小于300ms,处理线程会改变,但那是没有用的。
问题:我该怎么做才能使它并发?为什么程序重用线程?
提前致谢
public static void main(String[] args) throws InterruptedException {
long sleepTime = 1000;
ExecutorService e = Executors.newFixedThreadPool(3);
Observable.interval(300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long pT) {
return Observable.just(pT).subscribeOn(Schedulers.from(e));
}
})
.doOnNext(new Action1<Long>() {
@Override
public void call(Long pT) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.subscribe(new Action1<Long>() {
@Override
public void call(Long pT) {
System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());
}
});
Thread.sleep(50000);
e.shutdownNow();
}
日志
i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1
根据我对您的代码的理解,生产者的生产速度比订阅者快。然而 Observable<Long> interval(long interval, TimeUnit unit)
实际上不支持 Backpressure
。文档指出
This operator does not support backpressure as it uses time. If the downstream needs a slower it should slow the timer or use something like {@link #onBackpressureDrop}.
如果你的处理速度真的比生产者慢,你可以在你的订阅者代码中做这样的事情
.subscribe(new Action1<Long>() {
@Override
public void call(Long pT) {
e.submit(new Runnable() {
System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());
}
}
});
相反
.subscribeOn(Schedulers.computation())
尝试
.observeOn(Schedulers.computation())
我之前用 Rx 玩并发的这个例子作为例子非常好
public class ObservableZip {
private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async:", start, result));
}
public void showResult(String transactionType, long start, String result) {
System.out.println(result + " " +
transactionType + String.valueOf(System.currentTimeMillis() - start));
}
public Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
}
我在 GitHub 中找到了答案!
内部 observable 确实在 multi-thread 上发射,但接下来的后续行动却没有。如果我想要它并行,我应该在内部 observable 中进行。