为什么 onSubscribe 在 rxjava 中不起作用?
Why onSubscribe does not work in rxjava?
当我 运行 在代码下方时,如果我不写 observeOn
行,应用程序会崩溃,因为 getView().showBlockLayout(isBlock);
调用了一个试图隐藏或显示布局的方法。
但我试图将 observeOn(AndroidSchedulers.mainThread())
以下更改为 subscribeOn(AndroidSchedulers.mainThread())
并且应用程序再次崩溃!
subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean isBlock) {
getView().showBlockLayout(isBlock);
databaseHelper.getConference().setBlock(isBlock);
mConferenceModel.setBlock(isBlock);
}
}));
我也测试了这个:
subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean isBlock) {
getView().showBlockLayout(isBlock);
databaseHelper.getConference().setBlock(isBlock);
mConferenceModel.setBlock(isBlock);
}
}));
出乎意料的是它成功了而且没有崩溃!我没有在 getBlockObservable 方法中使用 subscribeOn(因为我知道我们可以设置一次)
这是我的 UserStore
class
PublishSubject<Pair<String,Boolean>> mObservableBlock;
private UserStore(){
mObservableBlock = PublishSubject.create();
mInstance = this;
}
public static UserStore getInstance() {
if(mInstance == null)
new UserStore();
return mInstance;
}
public Observable<Boolean> getBlockObservable(final String userId){
return mObservableBlock
.observeOn(Schedulers.computation())
.filter(new Func1<Pair<String,Boolean>, Boolean>() {
@Override
public Boolean call(Pair<String,Boolean> s) {
if(userId.equals(s.first))
return true;
return false;
}
}).map(new Func1< Pair<String, Boolean>, Boolean>() {
@Override
public Boolean call(Pair<String, Boolean> UserBlock) {
return UserBlock.second;
}
});
}
public void publishBlockedUser(String userId,boolean isBlock){
mObservableBlock.onNext(new Pair<String, Boolean>(userId,isBlock));
}
这是我在 gradle
中导入 rxjava 依赖项的方式
compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.2.0'
如本文所述medium artice:
One important fact is that subscribeOn does not work with Subjects.
因此您不能对主题使用 subscribeOn,我们必须在订阅前使用 observerOn(AndroidSchedulers.mainThread())
。
所以之后所有下游方法都会在 mainThread 上调用。
检查这个medium artice
当我 运行 在代码下方时,如果我不写 observeOn
行,应用程序会崩溃,因为 getView().showBlockLayout(isBlock);
调用了一个试图隐藏或显示布局的方法。
但我试图将 observeOn(AndroidSchedulers.mainThread())
以下更改为 subscribeOn(AndroidSchedulers.mainThread())
并且应用程序再次崩溃!
subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean isBlock) {
getView().showBlockLayout(isBlock);
databaseHelper.getConference().setBlock(isBlock);
mConferenceModel.setBlock(isBlock);
}
}));
我也测试了这个:
subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean isBlock) {
getView().showBlockLayout(isBlock);
databaseHelper.getConference().setBlock(isBlock);
mConferenceModel.setBlock(isBlock);
}
}));
出乎意料的是它成功了而且没有崩溃!我没有在 getBlockObservable 方法中使用 subscribeOn(因为我知道我们可以设置一次)
这是我的 UserStore
class
PublishSubject<Pair<String,Boolean>> mObservableBlock;
private UserStore(){
mObservableBlock = PublishSubject.create();
mInstance = this;
}
public static UserStore getInstance() {
if(mInstance == null)
new UserStore();
return mInstance;
}
public Observable<Boolean> getBlockObservable(final String userId){
return mObservableBlock
.observeOn(Schedulers.computation())
.filter(new Func1<Pair<String,Boolean>, Boolean>() {
@Override
public Boolean call(Pair<String,Boolean> s) {
if(userId.equals(s.first))
return true;
return false;
}
}).map(new Func1< Pair<String, Boolean>, Boolean>() {
@Override
public Boolean call(Pair<String, Boolean> UserBlock) {
return UserBlock.second;
}
});
}
public void publishBlockedUser(String userId,boolean isBlock){
mObservableBlock.onNext(new Pair<String, Boolean>(userId,isBlock));
}
这是我在 gradle
中导入 rxjava 依赖项的方式compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.2.0'
如本文所述medium artice:
One important fact is that subscribeOn does not work with Subjects.
因此您不能对主题使用 subscribeOn,我们必须在订阅前使用 observerOn(AndroidSchedulers.mainThread())
。
所以之后所有下游方法都会在 mainThread 上调用。
检查这个medium artice