如何一直观察对象直到 onError() / 取消订阅

How to keep observing an object until onError() / unsubscribing

我需要观察对象列表,并在列表中有新对象时立即发出事件。类似于客户端-服务器应用程序,其中客户端将内容添加到列表,服务器将新添加的内容发送给在服务器中注册的所有客户端。

到目前为止,我的 Observable 只发射了一项,之后就没有了。换句话说,当一个新对象被添加到列表中时有一个事件,但是当第二个(或第三个、第四个、第五个...)对象被添加时没有事件。

我需要它在列表中有新对象时继续发出项目。

这是我的代码,其中 MyList 是我自己实现的 List(与我的问题无关)。我希望发出的事件成为 MyList.

中新添加的对象
private void onClickMethod() {
    MyList myList = populateMyList();

    onNexAction = new Action1<MyList>() {
        @Override
        public void call(MyList myList) {
            System.out.println("call()");
            TheObject theObject = myList.getNext();
            System.out.println("New Event: " + theObject.getData());
        }
    };

    myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
        @Override
        public void call(Subscriber<? super MyList> t) {
            t.onNext(myList);
        }
    });

    myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}

上面的代码只会发出第一个添加的对象。但是,如果我在 call() 中放置一个 while 循环,它将发出所有事件,就像我想要的那样。但我觉得这不是做我想做的事情的合适方法。我觉得我更喜欢轮询而不是异步。

这是完全相同的代码,但带有 while 循环:

private void onClickMethod() {
    MyList myList = populateMyList();

    onNexAction = new Action1<MyList>() {
        @Override
        public void call(MyList myList) {
            System.out.println("call()");
            while (myList.size() > 0) { // The while cycle
                TheObject theObject = myList.getNext();
                System.out.println("New Event: " + theObject.getData());
            }
        }
    };

    myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
        @Override
        public void call(Subscriber<? super MyList> t) {
            t.onNext(myList);
        }
    });

    myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}

那么,我做错了什么?一旦 MyList 中有新对象,如何让我的 observable 发出新事件?

编辑:

根据 Tassos Bassoukos 的建议,我应用了 distinct() 运算符,如下所示:

myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).distinct().subscribe(onNexAction);

但这并没有解决我的问题。仅发射第一个添加的对象。

根据您的代码,当用户单击(某物)时,将调用 onClickMethod

此方法将创建一个对象 myList,您将构建一个将发出此对象的项目。然后你会消耗这个对象。

事实上,您只需发出一次列表,这就是为什么您只收到一次通知(一次点击)

要实现您要查找的内容,一种可能的方法是在您添加列表项时发出列表中的每一项。

private void onClickMethod() {
     MyList myList = populateMyList();

     myObservable = Observable.create(subscriber -> {
               int currentSize = myList.size();
               while(myList.size() != currentSize) {
                    subscriber.onNext(myList.getNext());
                    currentSize = myList.size();
               }
               subscriber.onCompleted();
     });

   myObservable.observeOn(Schedulers.newThread())
               .subscribeOn(Schedulers.io())
               .subscribe(item -> System.out.println(item));

}

请注意,此代码 高效,因为它会循环 myList.size() 并且如果在循环代码期间有人 add/remove 项,您可能会遇到问题.

更好的方法是在 myList 中添加新项目时收到通知,然后发出这个新项目。

首先,您没有从 observable 中获得多个值的原因是您传递给 Observable.create 的方法仅被调用 一次,并且首次订阅时发生。如果你只想把你的列表作为一个可观察的连接起来,最简单的方法是让 MyList class 实现 Iterable 并且你可以直接使用 from 而不是

private void onClickMethod() {
 MyList myList = populateMyList();
 Observable.from(myList)
           .observeOn(Schedulers.newThread())
           .subscribe(item -> System.out.println(item));

}

这是PublishSubject的典型案例:

class MyList<T> {
    final PublishSubject<T> onAdded = PublishSubject.create();
    void add(T value) {
        // add internally
        onAdded.onNext(value);
    }
    Observable<T> onAdded() {
        return onAdded;
    }
}

MyList<Integer> list = populate();

Subscription s = list.onAdded()
    .subscribe(v -> System.out.println("Added: " + v));

list.add(1);  // prints "Added: 1"
list.add(2);  // prints "Added: 2"
list.add(3);  // prints "Added: 3"

s.unsubscribe(); // not interested anymore

list.add(4);  // doesn't print anything
list.add(5);  // doesn't print anything