rxjava 链可观察动态

rxjava chain observalbes dynamiclly

我有一个链接的可观察对象,我是这样创建的:

Disposable disposable = currentUsedAdapter.connect(ip)
        .observeOn(AndroidSchedulers.mainThread())
        .concatMap(fallbackAdapter(ProtocolType.V2))
        .delay(500, TimeUnit.MILLISECONDS)
        .concatMap(fallbackAdapter(ProtocolType.V1))
        .subscribeWith(connectionSubscriber);

这是方法 fallbackAdapter:

private Function<Boolean, Observable<Boolean>> fallbackAdapter(ProtocolType protocolType) {
    return new Function<Boolean, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> apply(@NonNull Boolean isConnected) throws Exception {
            if (isConnected) {
                return Observable.just(true);
            } else {
                TempAdapter adapter = new TempAdapter(context, protocolType);
                return currentUsedAdapter.connect(ipAddress);
            }
        }
    };
}

目前这是静态完成的,并且工作正常。 虽然我想创建一个列表 fallbackAdapter(ProtocolType.*) 因为我只知道运行时的后备数量。

所以我创建了这个:

ArrayList<Function<Boolean, Observable<Boolean>>> adaptersList = new ArrayList<>();
adaptersList.add(fallbackAdapter(ProtocolType.V2));
...
adaptersList.add(fallbackAdapter(ProtocolType.V9));

Disposable disposable = Observable.fromIterable(adaptersList)
        .concatMap(adapter ->
                adapter.apply(true))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(connectionSubscriber);

我创建了一个可以动态更新的列表。 但是,我不确定如何将 isConnected 的值从一个适配器传递到另一个适配器。我目前将 true 传递给所有人,但其中一些应该 return false,但我不确定如何使用 [=18= 将此值从一个发射器传递到另一个发射器].

所以我的问题是我应该如何更改这个 .concatMap(adapter -> adapter.apply(true)) 这样我就不会总是发送 true 而是发送之前处理过的值适配器?

谢谢

如果对任何人有帮助... 我没有找到解决它的 rxjava 方法,所以我用一种旧的 java 时尚方式解决了...... 我创建了一个构建器 class 并向我的主要可观察对象添加了一个可观察对象,最后我返回了所有内容。 类似的东西:

public class DisposableBuilder {
    Observable<Boolean> observable;

    public DisposableBuilder() {
    }

    public void build(String ip) {
        observable = currentUsedAdapter.connect(host);
        if (adaptersNames != null) {
            for (int i = 1; i < adaptersNames.size(); i++) { // skip first adapter (currentUsedAdapter adapter)
                this.append(AdapterFactory.getAdapter(context, adaptersNames.get(i)));
            }
        }
    }

    public void append(CustomAdapter adapter) {
        observable = observable
                .delay(200, TimeUnit.MILLISECONDS)
                .concatMap(fallbackAdapter(adapter));
    }

    public Observable<Boolean> getObservable() {
        return observable;
    }
}

然后我像这样使用它:

disposableBuilder.build(ip);
this.disposable = disposableBuilder.getObservable()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(connectionSubscriber);