RxJava:查明 BehaviorSubject 是否是重复值

RxJava: Find out if BehaviorSubject was a repeated value or not

我正在制作一个 Android 界面,显示从网络获取的一些数据。我想让它显示最新的可用数据,并且永远不会为空(除非还没有获取任何数据)所以我使用 BehaviorSubject 为订阅者(我的 UI)提供最新的可用信息,在后台刷新它以更新它。

这行得通,但由于我的 UI 中的另一个要求,我现在必须知道发布的结果是否是从网络上获取的。 (换句话说,我需要知道发布的结果是否是BehaviorSubject的保存项。)

我怎样才能做到这一点?如果我需要将它拆分成多个 Observables,那很好,只要我能够获得 BehaviorSubject 的缓存行为(获得最后可用的结果),同时还能判断返回的结果是来自缓存还是不是。我能想到的一种 hacky 方法是检查响应的时间戳是否相对较快,但这真的很草率,我宁愿想出一种用 RxJava 来做的方法。

将BehaviorSubject 对象的信息存储在一个易于查找的数据结构中,例如Dictionnary。每个值都是一个键,值就是迭代次数。

因此,当您查看某个特定的键时,如果您的字典已经包含它并且它的值已经为 1,那么您就知道该值是一个重复值。

Distinct 不涵盖您的情况吗? BehaviorSubject 只重复订阅后的最新元素。

我相信你想要的是这样的:

private final BehaviorSubject<T> fetched = BehaviorSubject.create();
private final Observable<FirstTime<T>> _fetched = fetched.lift(new Observable.Operator<FirstTime<T>, T>() {
    private AtomicReference<T> last = new AtomicReference<>();
    @Override
    public Subscriber<? super T> call(Subscriber<? super FirstTime<T>> child) {
        return new Subscriber<T>(child) {
            @Override
            public void onCompleted() {
                child.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }

            @Override
            public void onNext(T t) {
                if (!Objects.equals(t, last.getAndSet(t))) {
                    child.onNext(FirstTime.yes(t));
                } else {
                    child.onNext(FirstTime.no(t));
                }
            }
        };
    }
});

public Observable<FirstTime<T>> getObservable() {
    return _fetched;
}

public static class FirstTime<T> {
    final boolean isItTheFirstTime;
    final T value;

    public FirstTime(boolean isItTheFirstTime, T value) {
        this.isItTheFirstTime = isItTheFirstTime;
        this.value = value;
    }

    public boolean isItTheFirstTime() {
        return isItTheFirstTime;
    }

    public T getValue() {
        return value;
    }

    public static <T> FirstTime<T> yes(T value) {
        return new FirstTime<>(true, value);
    }

    public static <T> FirstTime<T> no(T value) {
        return new FirstTime<>(false, value);
    }
}

包装器 class FirstTime 有一个布尔值,可用于查看 Observable 的任何订阅者之前是否看到过它。

希望对您有所帮助。

正如您在问题中提到的,这可以通过多个 Observable 来实现。本质上,您有两个 Observable:“可以观察到新的响应”,以及“可以观察到缓存的响应”。如果某个东西可以是 "observed",你可以将它表示为一个 Observable。让我们命名第一个 original 和第二个 replayed.

看到这个JSBin(Java脚本但是概念可以直接翻译成Java。据我所知没有JavaBin,出于这些目的)。

var original = Rx.Observable.interval(1000)
  .map(function (x) { return {value: x, from: 'original'}; })
  .take(4)
  .publish().refCount();

var replayed = original
  .map(function (x) { return {value: x.value, from: 'replayed'}; })
  .replay(null, 1).refCount();

var merged = Rx.Observable.merge(original, replayed)
  .replay(null, 1).refCount()
  .distinctUntilChanged(function (obj) { return obj.value; });

console.log('subscribe 1st');
merged.subscribe(function (x) { 
  console.log('subscriber1: value ' + x.value + ', from: ' + x.from);
});

setTimeout(function () {
  console.log('    subscribe 2nd');
  merged.subscribe(function (x) {
    console.log('    subscriber2: value ' + x.value + ', from: ' + x.from);
  });
}, 2500);

这里的总体思路是:使用指示事件来源的字段 from 对事件进行注释。如果是original,就是一个新鲜的回应。如果它是 replayed,它是一个缓存的响应。 Observable original 只会发出 from: 'original' 而 Observable replayed 只会发出 from: 'replayed'。在 Java 中,我们需要更多的样板文件,因为您需要制作一个 class 来表示这些带注释的事件。否则可以在 RxJava.

中找到 RxJS 中的相同运算符

原始的 Observable 是 publish().refCount() 因为我们只希望这个流的一个实例与所有观察者共享。事实上在 RxJS 和 Rx.NET 中,share()publish().refCount().

的别名

重放的 Observable 是 replay(1).refCount() 因为它也像原始的一样共享,但是 replay(1) 给了我们缓存行为。

merged Observable 包含原始和重播,这是您应该向所有订阅者公开的内容。由于 replayed 会在 original 发出时立即发出,因此我们在事件值上使用 distinctUntilChanged 来忽略立即连续。我们 replay(1).refCount() 合并 的原因是因为我们希望原始和重播的合并也是所有观察者共享的流的一个共享实例。我们会为此目的使用 publish().refCount(),但我们不能失去 replayed 包含的重放效果,因此它是 replay(1).refCount(),而不是 publish().refCount()

我不太确定你想要达到什么目的。可能您只想拥有 "latest" 数据的智能来源和告诉您数据刷新时间的第二个来源?

    BehaviorSubject<Integer> dataSubject = BehaviorSubject.create(42); // initial value, "never empty"

    Observable<String> refreshedIndicator = dataSubject.map(data -> "Refreshed!");
    refreshedIndicator.subscribe(System.out::println);

    Observable<Integer> latestActualData = dataSubject.distinctUntilChanged();
    latestActualData.subscribe( data -> System.out.println( "Got new data: " + data));

    // simulation of background activity:
    Observable.interval(1, TimeUnit.SECONDS)
            .limit(100)
            .toBlocking()
            .subscribe(aLong -> dataSubject.onNext(ThreadLocalRandom.current().nextInt(2)));

输出:

Refreshed!
Got new data: 42
Refreshed!
Got new data: 0
Refreshed!
Refreshed!
Refreshed!
Got new data: 1
Refreshed!
Got new data: 0
Refreshed!
Got new data: 1