关于RxJava线程调度的查询
Query on RxJava thread scheduling
我是 RxJava2 的新手。在下面的代码中,我无法理解订阅者如何在后台线程上工作,即使 Observable/Flowable 在主线程上发出并且没有指定调度程序(使用 subscribeOn(Schedulers.*) 调用) .完整代码可以在 this github repo.
中找到
@OnClick(R.id.btn_start_simple_polling)
public void onStartSimplePollingClicked() {
_log("onStartSimplePollingClicked called on "); //MAIN THREAD
final int pollCount = POLL_COUNT;
Disposable d = Observable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(this::_doNetworkCallAndGetStringResult)
.take(pollCount)
.doOnSubscribe(subscription -> {
_log(String.format("Start simple polling - %s", _counter)); //MAIN THREAD
})
.subscribe(taskName -> {
_log(String.format(Locale.US,
"Executing polled task [%s] now time : [xx:%02d]",
taskName,
_getSecondHand()));
});
_disposables.add(d);
}
private String _doNetworkCallAndGetStringResult(long attempt) {
try {
_log("_doNetworkCallAndGetStringResult called on "); //BACKGROUND THREAD
if (attempt == 4) {
// randomly make one event super long so we test that the repeat logic waits
// and accounts for this.
Thread.sleep(9000);
}
else {
Thread.sleep(3000);
}
} catch (InterruptedException e) {
Timber.d("Operation was interrupted");
}
_counter++;
return String.valueOf(_counter);
}
因为您没有指定订阅 RxJava 的调度程序默认为同步订阅。所以对 onSubscribe
和 doOnSubscribe
的调用发生在主线程上。
然而,Observable.interval
运算符需要隐式或显式调度程序来广播 onNext
事件。由于您没有指定调度程序,因此它默认为 Schedulers.computation()
。
在间隔触发后,它继续在同一个计算线程上调用 _doNetworkCallAndGetStringResult
,因此发生在后台。
RxJava 默认 运行 同步,但一些操作符如@Kiskae 已经告诉你间隔、延迟或其他一些操作符
如果你想 运行 管道异步你将不得不使用 observerOn 这将使 运行 另一个线程中的管道一旦被放入你的管道
/**
* Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
* Shall print
* First step main
* Second step RxNewThreadScheduler-2
* Third step RxNewThreadScheduler-1
*/
@Test
public void testObservableObserverOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Third step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
或使用 subscribeOn,它将使您的管道 运行 在您指定的线程中
/**
* Does not matter at what point in your pipeline you set your subscribeOn, once that is set in the pipeline,
* all steps will be executed in another thread.
* Shall print
* First step RxNewThreadScheduler-1
* Second step RxNewThreadScheduler-1
*/
@Test
public void testObservableSubscribeOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.subscribeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
您可以在此处查看有关异步 rxJava 的更多示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java
我是 RxJava2 的新手。在下面的代码中,我无法理解订阅者如何在后台线程上工作,即使 Observable/Flowable 在主线程上发出并且没有指定调度程序(使用 subscribeOn(Schedulers.*) 调用) .完整代码可以在 this github repo.
中找到@OnClick(R.id.btn_start_simple_polling)
public void onStartSimplePollingClicked() {
_log("onStartSimplePollingClicked called on "); //MAIN THREAD
final int pollCount = POLL_COUNT;
Disposable d = Observable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(this::_doNetworkCallAndGetStringResult)
.take(pollCount)
.doOnSubscribe(subscription -> {
_log(String.format("Start simple polling - %s", _counter)); //MAIN THREAD
})
.subscribe(taskName -> {
_log(String.format(Locale.US,
"Executing polled task [%s] now time : [xx:%02d]",
taskName,
_getSecondHand()));
});
_disposables.add(d);
}
private String _doNetworkCallAndGetStringResult(long attempt) {
try {
_log("_doNetworkCallAndGetStringResult called on "); //BACKGROUND THREAD
if (attempt == 4) {
// randomly make one event super long so we test that the repeat logic waits
// and accounts for this.
Thread.sleep(9000);
}
else {
Thread.sleep(3000);
}
} catch (InterruptedException e) {
Timber.d("Operation was interrupted");
}
_counter++;
return String.valueOf(_counter);
}
因为您没有指定订阅 RxJava 的调度程序默认为同步订阅。所以对 onSubscribe
和 doOnSubscribe
的调用发生在主线程上。
然而,Observable.interval
运算符需要隐式或显式调度程序来广播 onNext
事件。由于您没有指定调度程序,因此它默认为 Schedulers.computation()
。
在间隔触发后,它继续在同一个计算线程上调用 _doNetworkCallAndGetStringResult
,因此发生在后台。
RxJava 默认 运行 同步,但一些操作符如@Kiskae 已经告诉你间隔、延迟或其他一些操作符
如果你想 运行 管道异步你将不得不使用 observerOn 这将使 运行 另一个线程中的管道一旦被放入你的管道
/**
* Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
* Shall print
* First step main
* Second step RxNewThreadScheduler-2
* Third step RxNewThreadScheduler-1
*/
@Test
public void testObservableObserverOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Third step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
或使用 subscribeOn,它将使您的管道 运行 在您指定的线程中
/**
* Does not matter at what point in your pipeline you set your subscribeOn, once that is set in the pipeline,
* all steps will be executed in another thread.
* Shall print
* First step RxNewThreadScheduler-1
* Second step RxNewThreadScheduler-1
*/
@Test
public void testObservableSubscribeOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.subscribeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
您可以在此处查看有关异步 rxJava 的更多示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java