从 while 循环创建 Flowable

Create Flowable from while loop

我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。

首先自己写了dao,处理InputStream,提供指定范围内的Item。目前它只是在列表中收集数据,但我想使用 flowable 一个一个地提供项目;目前它提供 Maybe<List<Item>>。还有一些错误需要传输到更高级别(数据源)。比如EndOfFile,通知DataSource数据缓存完毕;

Dao.class:

List<Item> loadRange(int start, int number) throws ... {
    ...
    while(...) {
        ...
        //TODO contribute item to flowable
        resultList.add(new Item(...)) 

    }
    return resultList;
}

Maybe<List<Item>> 方法刚刚创建 Maybe.fromCallable();

请帮帮我!

像这样的东西应该适用于此:

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(emitter -> {
            try {
                while (...){
                    emitter.onNext(new Item());
                }
                emitter.onComplete();
            } catch (IOException e) {
                emitter.onError(e);
            }
        }, BackpressureStrategy.BUFFER);
    }

我假设一旦循环完成,您还想将错误发送到下游,而不是处理方法签名。您也可以更改 BackPressureStrategy 以适合您的用例,即 DROPLATEST 等。

由于您是 RxJava 的新手,匿名 class 将是:

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(new FlowableOnSubscribe<Item>() {
            @Override public void subscribe(FlowableEmitter<Item> emitter) {
                try {
                    while (...){
                        emitter.onNext(new Item());
                    }
                    emitter.onComplete();
                } catch (IOException e) {
                    emitter.onError(e);
                }
            }
        }, BackpressureStrategy.BUFFER);
    }