我如何在 RxJava 中的动态列表上创建观察者?

How can I create an Observer over a dynamic list in RxJava?

我需要在不断变化(添加元素)的数组上创建一个 Observer。

我正在使用 Obserable.from(Iterable),但它似乎在 ArrayList 上创建了 Observable,就像创建时一样。

我需要通知 Observer 并在 ArrayList 每次添加新元素时执行 Action。

给你。感谢 RxJava 上的 Dávid Karnok Google Group

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.subjects.PublishSubject;

public class ObservableListExample {

    public static class ObservableList<T> {

        protected final List<T> list;
        protected final PublishSubject<T> onAdd;

        public ObservableList() {
            this.list = new ArrayList<T>();
            this.onAdd = PublishSubject.create();
        }
        public void add(T value) {
            list.add(value);
            onAdd.onNext(value);
        }
        public Observable<T> getObservable() {
            return onAdd;
        }
    }

    public static void main(String[] args) throws Exception {
        ObservableList<Integer> olist = new ObservableList<>();

        olist.getObservable().subscribe(System.out::println);

        olist.add(1);
        Thread.sleep(1000);
        olist.add(2);
        Thread.sleep(1000);
        olist.add(3);
    }
}

您可以将两个可观察对象合并为一个。 其中之一可以是元素的初始列表,第二个可以是主题:

import rx.Observable;
import rx.subjects.ReplaySubject;
import java.util.ArrayList;
import java.util.List;

public class ExampleObservableList {
    public static void main(String[] args) {
        List<Integer> initialNumbers = new ArrayList<Integer>();
        initialNumbers.add(1);
        initialNumbers.add(2);

        Observable<Integer> observableInitial = Observable.from(initialNumbers);
        ReplaySubject<Integer> subject = ReplaySubject.create();

        Observable<Integer> source = Observable.merge(observableInitial, subject);

        source.subscribe(System.out::println);

        for (int i = 0; i < 100; ++i) {
            subject.onNext(i);
        }
    }

}

如果您没有初始元素,您只能使用 ReplaySubject(或其他 Subject -> 请参阅 http://reactivex.io/documentation/subject.html):

public static void main(String[] args) {
    ReplaySubject<Integer> source = ReplaySubject.create();
    source.subscribe(System.out::println);

    for (int i = 0; i < 100; ++i) {
        source.onNext(i);
    }
}

我会考虑这种基于 BehaviourSubject 的方法。 这与 juanpavergara 的解决方案不同,在订阅 Observable 时,onNext() 将立即发送给 Observer。

public class ObservableList<T> {

    protected final List<T> list;
    protected final BehaviorSubject<List<T>> behaviorSubject;

    public ObservableList(List<T> list) {
        this.list = list;
        this.behaviorSubject = BehaviorSubject.create(list);
    }

    public Observable<List<T>> getObservable() {
        return behaviorSubject;
    }

    public void add(T element) {
        list.add(element);
        behaviorSubject.onNext(list);
    }
}


private void main() {
    final List<Integer> list = new ArrayList<>();
    list.add(0);
    list.add(1);

    final ObservableList<Integer> olist = new ObservableList<>(list);

    olist.getObservable().subscribe(System.out::println);

    olist.add(2);
    olist.add(3);
}

这个解决方案在实现 MVP 时可能很有用,当你想观察系统中一个组件(即:一个存储库或数据源)返回的一个资源(即:对象列表),并且你想要 Observer (即:Presenter 或 Interactor)在将元素添加到系统另一部分的列表时收到通知。

您好,您可以使用 ObservableRxList 来观察 addAll、添加、删除和更新列表

public class ObservableRxList<T> {
    protected final List<T> list;
    protected final PublishSubject<List<T>> subject;

     public ObservableRxList() {
         this.list = new ArrayList<T>();
         this.subject = PublishSubject.create();
     }

     public void add(T value) {
         list.add(value);
         subject.onNext(list);
     }

     public void addAll(List<T> value) {
         list.addAll(value);
         subject.onNext(list);
     }

     //not sure about this
     public void update(T value) {
         for (ListIterator<T> it = list.listIterator(); it.hasNext(); ) {
             if (value == it.next()) {
                 it.set(value);
                 break;
             }
         }
         subject.onNext(list);
     }

     public void update(int position, T value) {
         list.set(position, value);
         subject.onNext(list);
     }

     public void remove(T value) {
         list.remove(value);
         subject.onNext(list);
     }

     public void remove(int index) {
         list.remove(index);
         subject.onNext(list);
     }

     public Observable<List<T>> getObservable() {
         return subject;
     }

     public List<T> getCurrentList() {
         return list;
     }
 }

用法: 每次使用 addAll、添加、删除或更新此观察者都会触发整个更新列表

mObservableRxList.getObservable().subscribe(productList-> {
    this.products.clear();
    this.products.addAll(productList;
    productAdapter.notifyDataSetChanged();
});