从 while 循环创建 Flowable
Create Flowable from while loop
我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。
首先自己写了dao,处理InputStream,提供指定范围内的Item。目前它只是在列表中收集数据,但我想使用 flowable 一个一个地提供项目;目前它提供 Maybe<List<Item>>
。还有一些错误需要传输到更高级别(数据源)。比如EndOfFile,通知DataSource数据缓存完毕;
Dao.class
:
List<Item> loadRange(int start, int number) throws ... {
...
while(...) {
...
//TODO contribute item to flowable
resultList.add(new Item(...))
}
return resultList;
}
Maybe<List<Item>>
方法刚刚创建 Maybe.fromCallable()
;
请帮帮我!
像这样的东西应该适用于此:
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(emitter -> {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}, BackpressureStrategy.BUFFER);
}
我假设一旦循环完成,您还想将错误发送到下游,而不是处理方法签名。您也可以更改 BackPressureStrategy
以适合您的用例,即 DROP
、LATEST
等。
由于您是 RxJava 的新手,匿名 class 将是:
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(new FlowableOnSubscribe<Item>() {
@Override public void subscribe(FlowableEmitter<Item> emitter) {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。
首先自己写了dao,处理InputStream,提供指定范围内的Item。目前它只是在列表中收集数据,但我想使用 flowable 一个一个地提供项目;目前它提供 Maybe<List<Item>>
。还有一些错误需要传输到更高级别(数据源)。比如EndOfFile,通知DataSource数据缓存完毕;
Dao.class
:
List<Item> loadRange(int start, int number) throws ... {
...
while(...) {
...
//TODO contribute item to flowable
resultList.add(new Item(...))
}
return resultList;
}
Maybe<List<Item>>
方法刚刚创建 Maybe.fromCallable()
;
请帮帮我!
像这样的东西应该适用于此:
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(emitter -> {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}, BackpressureStrategy.BUFFER);
}
我假设一旦循环完成,您还想将错误发送到下游,而不是处理方法签名。您也可以更改 BackPressureStrategy
以适合您的用例,即 DROP
、LATEST
等。
由于您是 RxJava 的新手,匿名 class 将是:
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(new FlowableOnSubscribe<Item>() {
@Override public void subscribe(FlowableEmitter<Item> emitter) {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}