rxjava 链可观察动态
rxjava chain observalbes dynamiclly
我有一个链接的可观察对象,我是这样创建的:
Disposable disposable = currentUsedAdapter.connect(ip)
.observeOn(AndroidSchedulers.mainThread())
.concatMap(fallbackAdapter(ProtocolType.V2))
.delay(500, TimeUnit.MILLISECONDS)
.concatMap(fallbackAdapter(ProtocolType.V1))
.subscribeWith(connectionSubscriber);
这是方法 fallbackAdapter
:
private Function<Boolean, Observable<Boolean>> fallbackAdapter(ProtocolType protocolType) {
return new Function<Boolean, Observable<Boolean>>() {
@Override
public Observable<Boolean> apply(@NonNull Boolean isConnected) throws Exception {
if (isConnected) {
return Observable.just(true);
} else {
TempAdapter adapter = new TempAdapter(context, protocolType);
return currentUsedAdapter.connect(ipAddress);
}
}
};
}
目前这是静态完成的,并且工作正常。
虽然我想创建一个列表 fallbackAdapter(ProtocolType.*)
因为我只知道运行时的后备数量。
所以我创建了这个:
ArrayList<Function<Boolean, Observable<Boolean>>> adaptersList = new ArrayList<>();
adaptersList.add(fallbackAdapter(ProtocolType.V2));
...
adaptersList.add(fallbackAdapter(ProtocolType.V9));
Disposable disposable = Observable.fromIterable(adaptersList)
.concatMap(adapter ->
adapter.apply(true))
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(connectionSubscriber);
我创建了一个可以动态更新的列表。
但是,我不确定如何将 isConnected
的值从一个适配器传递到另一个适配器。我目前将 true
传递给所有人,但其中一些应该 return false
,但我不确定如何使用 [=18= 将此值从一个发射器传递到另一个发射器].
所以我的问题是我应该如何更改这个 .concatMap(adapter -> adapter.apply(true))
这样我就不会总是发送 true
而是发送之前处理过的值适配器?
谢谢
如果对任何人有帮助...
我没有找到解决它的 rxjava 方法,所以我用一种旧的 java 时尚方式解决了......
我创建了一个构建器 class 并向我的主要可观察对象添加了一个可观察对象,最后我返回了所有内容。
类似的东西:
public class DisposableBuilder {
Observable<Boolean> observable;
public DisposableBuilder() {
}
public void build(String ip) {
observable = currentUsedAdapter.connect(host);
if (adaptersNames != null) {
for (int i = 1; i < adaptersNames.size(); i++) { // skip first adapter (currentUsedAdapter adapter)
this.append(AdapterFactory.getAdapter(context, adaptersNames.get(i)));
}
}
}
public void append(CustomAdapter adapter) {
observable = observable
.delay(200, TimeUnit.MILLISECONDS)
.concatMap(fallbackAdapter(adapter));
}
public Observable<Boolean> getObservable() {
return observable;
}
}
然后我像这样使用它:
disposableBuilder.build(ip);
this.disposable = disposableBuilder.getObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(connectionSubscriber);
我有一个链接的可观察对象,我是这样创建的:
Disposable disposable = currentUsedAdapter.connect(ip)
.observeOn(AndroidSchedulers.mainThread())
.concatMap(fallbackAdapter(ProtocolType.V2))
.delay(500, TimeUnit.MILLISECONDS)
.concatMap(fallbackAdapter(ProtocolType.V1))
.subscribeWith(connectionSubscriber);
这是方法 fallbackAdapter
:
private Function<Boolean, Observable<Boolean>> fallbackAdapter(ProtocolType protocolType) {
return new Function<Boolean, Observable<Boolean>>() {
@Override
public Observable<Boolean> apply(@NonNull Boolean isConnected) throws Exception {
if (isConnected) {
return Observable.just(true);
} else {
TempAdapter adapter = new TempAdapter(context, protocolType);
return currentUsedAdapter.connect(ipAddress);
}
}
};
}
目前这是静态完成的,并且工作正常。
虽然我想创建一个列表 fallbackAdapter(ProtocolType.*)
因为我只知道运行时的后备数量。
所以我创建了这个:
ArrayList<Function<Boolean, Observable<Boolean>>> adaptersList = new ArrayList<>();
adaptersList.add(fallbackAdapter(ProtocolType.V2));
...
adaptersList.add(fallbackAdapter(ProtocolType.V9));
Disposable disposable = Observable.fromIterable(adaptersList)
.concatMap(adapter ->
adapter.apply(true))
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(connectionSubscriber);
我创建了一个可以动态更新的列表。
但是,我不确定如何将 isConnected
的值从一个适配器传递到另一个适配器。我目前将 true
传递给所有人,但其中一些应该 return false
,但我不确定如何使用 [=18= 将此值从一个发射器传递到另一个发射器].
所以我的问题是我应该如何更改这个 .concatMap(adapter -> adapter.apply(true))
这样我就不会总是发送 true
而是发送之前处理过的值适配器?
谢谢
如果对任何人有帮助... 我没有找到解决它的 rxjava 方法,所以我用一种旧的 java 时尚方式解决了...... 我创建了一个构建器 class 并向我的主要可观察对象添加了一个可观察对象,最后我返回了所有内容。 类似的东西:
public class DisposableBuilder {
Observable<Boolean> observable;
public DisposableBuilder() {
}
public void build(String ip) {
observable = currentUsedAdapter.connect(host);
if (adaptersNames != null) {
for (int i = 1; i < adaptersNames.size(); i++) { // skip first adapter (currentUsedAdapter adapter)
this.append(AdapterFactory.getAdapter(context, adaptersNames.get(i)));
}
}
}
public void append(CustomAdapter adapter) {
observable = observable
.delay(200, TimeUnit.MILLISECONDS)
.concatMap(fallbackAdapter(adapter));
}
public Observable<Boolean> getObservable() {
return observable;
}
}
然后我像这样使用它:
disposableBuilder.build(ip);
this.disposable = disposableBuilder.getObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(connectionSubscriber);