RxJava:可观察和默认线程
RxJava: Observable and default thread
我有以下代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
这是输出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要细节:我正在从另一个线程(Android 中的 main
线程)调用 subscribe
方法。
所以看起来 Observable
class 是同步的,默认情况下它在发出事件的同一线程上执行所有操作(map
+ 通知订阅者)(s.onNext
),对吧?我想知道......这是故意的行为还是我误解了什么?实际上,我期望至少 onNext
和 onComplete
回调将在调用者的线程上调用,而不是在发出事件的线程上调用。我是否正确理解在这种特殊情况下实际调用者的线程无关紧要?至少在异步生成事件时。
另一个问题 - 如果我从一些外部来源接收到一些 Observable 作为参数怎么办(即我不是自己生成它)......作为它的用户我没有办法检查它是否是同步的还是异步的,我只需要明确指定我想通过 subscribeOn
和 observeOn
方法接收回调的位置,对吗?
谢谢!
RxJava 对并发没有意见。如果您不使用任何其他机制,如 observeOn/subscribeOn,它将在订阅线程上产生值。请不要在运算符中使用 Thread 等低级构造,否则可能会破坏契约。
由于使用了 Thread,onNext 将从调用 Thread ('background-thread-1') 调用。订阅发生在调用 (UI-Thread) 上。链下的每个运算符都将从 'background-thread-1'-calling-Thread 调用。订阅 onNext 也将从 'background-thread-1'.
调用
如果你想生成不在调用线程上的值,请使用:subscribeOn。如果您想将线程切换回主线,请在链中的某处使用 observeOn。最有可能在订阅之前。
示例:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
.subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
.map(integer -> integer) // map happens on Computational-Threads
.observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
.subscribe(integer -> {
// called from mainThread
});
这是一个很好的解释。
http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html
我有以下代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
这是输出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要细节:我正在从另一个线程(Android 中的 main
线程)调用 subscribe
方法。
所以看起来 Observable
class 是同步的,默认情况下它在发出事件的同一线程上执行所有操作(map
+ 通知订阅者)(s.onNext
),对吧?我想知道......这是故意的行为还是我误解了什么?实际上,我期望至少 onNext
和 onComplete
回调将在调用者的线程上调用,而不是在发出事件的线程上调用。我是否正确理解在这种特殊情况下实际调用者的线程无关紧要?至少在异步生成事件时。
另一个问题 - 如果我从一些外部来源接收到一些 Observable 作为参数怎么办(即我不是自己生成它)......作为它的用户我没有办法检查它是否是同步的还是异步的,我只需要明确指定我想通过 subscribeOn
和 observeOn
方法接收回调的位置,对吗?
谢谢!
RxJava 对并发没有意见。如果您不使用任何其他机制,如 observeOn/subscribeOn,它将在订阅线程上产生值。请不要在运算符中使用 Thread 等低级构造,否则可能会破坏契约。
由于使用了 Thread,onNext 将从调用 Thread ('background-thread-1') 调用。订阅发生在调用 (UI-Thread) 上。链下的每个运算符都将从 'background-thread-1'-calling-Thread 调用。订阅 onNext 也将从 'background-thread-1'.
调用如果你想生成不在调用线程上的值,请使用:subscribeOn。如果您想将线程切换回主线,请在链中的某处使用 observeOn。最有可能在订阅之前。
示例:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
.subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
.map(integer -> integer) // map happens on Computational-Threads
.observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
.subscribe(integer -> {
// called from mainThread
});
这是一个很好的解释。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html