RxJava 将新对象添加到流中
RxJava Add new objects into stream
我正在尝试将新对象注入到可能存在的流中。
假设我有这个:
ItemUtils.java
public static void processItems(List<Item> items) {
Observable.from(items)
.subscribeOn(Schedulers.io())
.flatMap(ItemUtils::doSomeHeavyProcessing)
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
在另一个 class 中:
List<Item> items = new ArrayList<>();
public void onClick() {
processItems(list);
}
每次不耐烦的用户按下按钮时,是否可以将项目注入一个独特的项目流?
有可能,这个问题不够精确,所以这个答案很笼统。
基本思路是使用 [concatMap
](http://reactivex.io/RxJava/javadoc/rx/Observable.html#concat(rx.Observable, rx.Observable)) 或此运算符的变体。
因此,例如,取决于代码真正需要执行的操作可能是:
public static void processItems(List<Item> items) {
Observable<ProcessdItem> processedItems = Observable.from(items)
.subscribeOn(Schedulers.io())
.flatMap(ItemUtils::doSomeHeavyProcessing)
.observeOn(AndroidSchedulers.mainThread());
Observable.concat(
Observable.just(ProcessedItems.preProcessed()),
processedItems
).subscribe();
}
或者在嵌套的 observable 中。
public static void processItems(List<Item> items) {
Observable.from(items)
.subscribeOn(Schedulers.io())
.flatMap(i ->
Observable.just(ProcessedItems.preProcessed())
.concatWith(ItemUtils::doSomeHeavyProcessing))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
如果 List items 是一种队列,当 processItems 已经启动时用户可以在其中添加更多项目,您应该使用 Subjects
Queue<Item> items = new LinkedList<>();
PublishSubject<Item> subject = PublishSubject.create();
...
//subscribe once and supply new items in onClick
subject.subscribeOn(Schedulers.io())
.flatMap(ItemUtils::doSomeHeavyProcessing)
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
...
public void onClick() {
while(!queue.isEmpty()){
subject.onNext(queue.remove());
}
}
我正在尝试将新对象注入到可能存在的流中。
假设我有这个:
ItemUtils.java
public static void processItems(List<Item> items) {
Observable.from(items)
.subscribeOn(Schedulers.io())
.flatMap(ItemUtils::doSomeHeavyProcessing)
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
在另一个 class 中:
List<Item> items = new ArrayList<>();
public void onClick() {
processItems(list);
}
每次不耐烦的用户按下按钮时,是否可以将项目注入一个独特的项目流?
有可能,这个问题不够精确,所以这个答案很笼统。
基本思路是使用 [concatMap
](http://reactivex.io/RxJava/javadoc/rx/Observable.html#concat(rx.Observable, rx.Observable)) 或此运算符的变体。
因此,例如,取决于代码真正需要执行的操作可能是:
public static void processItems(List<Item> items) {
Observable<ProcessdItem> processedItems = Observable.from(items)
.subscribeOn(Schedulers.io())
.flatMap(ItemUtils::doSomeHeavyProcessing)
.observeOn(AndroidSchedulers.mainThread());
Observable.concat(
Observable.just(ProcessedItems.preProcessed()),
processedItems
).subscribe();
}
或者在嵌套的 observable 中。
public static void processItems(List<Item> items) {
Observable.from(items)
.subscribeOn(Schedulers.io())
.flatMap(i ->
Observable.just(ProcessedItems.preProcessed())
.concatWith(ItemUtils::doSomeHeavyProcessing))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
如果 List items 是一种队列,当 processItems 已经启动时用户可以在其中添加更多项目,您应该使用 Subjects
Queue<Item> items = new LinkedList<>();
PublishSubject<Item> subject = PublishSubject.create();
...
//subscribe once and supply new items in onClick
subject.subscribeOn(Schedulers.io())
.flatMap(ItemUtils::doSomeHeavyProcessing)
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
...
public void onClick() {
while(!queue.isEmpty()){
subject.onNext(queue.remove());
}
}