结合以前的观察

Combine previous observable

我正在尝试使用 RxJava、RxAndroid 和 Mosby3 将两种形式的插入合并为一个形式,但我找不到使其工作的方法。

我的结构:

public final class CheckinIntent {

    private final CheckinCommand checkinCommand;
    private final Bitmap signature;

    public CheckinIntent(CheckinCommand checkinCommand, Bitmap signature) {
        this.checkinCommand = checkinCommand;
        this.signature = signature;
    }

    public CheckinCommand getCheckinCommand() {
        return checkinCommand;
    }

    public Bitmap getSignature() {
        return signature;
    }
}

我在哪里触发我的意图(MVI 模式):

final Observable<Bitmap> signatureObservable = Observable.just(BitmapFactory.decodeFile(storage.getFile("signs", booking.getBookingId()).getAbsolutePath()));
        final Observable<CheckinCommand> checkinCommandObservable = Observable.just(new CheckinCommand(booking.getBookingId(), booking.getUserId(), booking.getPartnerId(), userDetailsTextView.getText().toString(), "google.com"));

        final Observable<CheckinIntent> intentObservable = Observable.zip(signatureObservable, checkinCommandObservable, (image, command) -> new CheckinIntent(command, image));

        return saveButtonClickObservable
                .flatMap(bla -> intentObservable);

并将它们绑定在一起:

 @Override
    protected void bindIntents() {
        Observable<CheckinViewState> checkinViewStateObservable =
                intent(CheckinView::sendCheckin)
                        .flatMap(checkinIntent -> imageRepository.uploadImage(checkinIntent.getSignature())
                        .flatMap(command ->  bookingRepository.doCheckin(command) <------ PROBLEM HERE, HOW CAN I ACCESS THE COMMAND FROM ABOVE ??
                                .subscribeOn(Schedulers.from(threadExecutor))
                                .map(CheckinViewState.Success::new)
                                .cast(CheckinViewState.class)
                                .startWith(new CheckinViewState.LoadingState())
                                .onErrorReturn(CheckinViewState.ErrorState::new))
                        .observeOn(postExecutionThread.getScheduler());

        subscribeViewState(checkinViewStateObservable, CheckinView::render);
}

Observable<CnhImageResponse> uploadImage(Bitmap bitmap);

我的问题是,我的 uploadImage returns 一个以字符串结尾的内部结构,但是,我怎样才能得到返回的字符串,将它添加到我的 command 对象(设置返回的URL 在此对象中)并继续流程(将我的命令发送到云端)?

谢谢!

直接在第一个 flatMap 中的可观察对象上的 flatMap。在那种情况下,您同时引用了 checkinIntent 和 command

 @Override
 protected void bindIntents() {
        Observable<CheckinViewState> checkinViewStateObservable =
                intent(CheckinView::sendCheckin)
                        .flatMap(checkinIntent -> { 
                          return imageRepository.uploadImage(checkinIntent.getSignature()
                                                .flatMap(imageResponse ->  bookingRepository.doCheckin(command) <-- Now you have access to both, command and CnhImageResponse 
                         }) 
                         .subscribeOn(Schedulers.from(threadExecutor))
                         .map(CheckinViewState.Success::new)
                         .cast(CheckinViewState.class)
                         .startWith(new CheckinViewState.LoadingState())
                         .onErrorReturn(CheckinViewState.ErrorState::new))
                         .observeOn(postExecutionThread.getScheduler());

        subscribeViewState(checkinViewStateObservable, CheckinView::render);
}

替代解决方案:将 Pair<CheckinIntent, Command>bookingRepository.doCheckin(...) 传递给 Observable,如下所示:

@Override
protected void bindIntents() {
        Observable<CheckinViewState> checkinViewStateObservable =
                intent(CheckinView::sendCheckin)
                        .flatMap(checkinIntent -> imageRepository.uploadImage(checkinIntent.getSignature()
                                                                 .map(imageResponse -> Pair.create(checkinIntent, imageResponse))) // Returns a Pair<CheckinIntent, CnhImageResponse>
                        .flatMap(pair ->  bookingRepository.doCheckin(pair.first) <-- Now you can access the pair holding both information
                                .subscribeOn(Schedulers.from(threadExecutor))
                                .map(CheckinViewState.Success::new)
                                .cast(CheckinViewState.class)
                                .startWith(new CheckinViewState.LoadingState())
                                .onErrorReturn(CheckinViewState.ErrorState::new))
                        .observeOn(postExecutionThread.getScheduler());

        subscribeViewState(checkinViewStateObservable, CheckinView::render);
}

其他一些注意事项:

在 MVI 中,您几乎总是希望 switchMap() 而不是 flatMap()。 switchMap 取消订阅之前的订阅,而 flatMap 没有。这意味着如果您像在代码中所做的那样 flatMap 被剪断,并且如果由于某种原因在旧的 checkinIntent 尚未完成时触发了新的 checkinIntent (即 imageRepository.uploadImage() 仍在进行中),您最终会拥有两个将调用 CheckinView::render 的流,因为第一个流仍在继续工作并通过已建立的可观察流向下发出结果。 switchMap() 通过在开始 "switchMaping" 新意图之前取消订阅第一个(未完成的)意图来防止这种情况,这样您当时只有一个流。

构建 CheckinIntent 的方式应移至 Presenter。对于 "dump" 视图,这有点太多了 "logic"。 Observable.just(BitmapFactory.decodeFile(...)) 在主线程上也是 运行。我建议使用 Observable.fromCallable( () -> BitmapFactory.decodeFile(...)),因为后者会延迟他的 "work"(位图解码),直到实际订阅此可观察对象,然后您才能应用后台调度程序。 Observable.just() 基本等同于:

Bitmap bitmap = BitmapFactory.decodeFile(...); // Here is the "hard work" already done, even if observable below is not subscribed at all.
Observable.just(bitmap);