当 Rx Observable 发生变化时从 Rx Maybe 获取最新数据

Fetching latest data from Rx Maybe when Rx Observable changes

我有一个用例,我想将来自 Observable<M> 的数据与来自 Maybe<N> 的最新值组合起来。 API 和 Maybe 是我无法控制的外部 API。实际上,我想做的是在 Observable 更改时从 Maybe 中延迟获取值,并使用函数组合它们。

为简单起见,我们假设 MIntegerNString。我尝试编写以下内容:

public interface SomeExternalApi {
  Maybe<String> getLatestString();
}

public interface SomeInternalApi {
  Observable<Integer> getIntegerObservable();
}

public class MyClass {

  private final SomeExternalApi externalApi;
  private final SomeInternalApi internalApi;

  public Disposable subscribe(Consumer<? super IntegerAndString> observer) {
    return internalApi
        .getIntegerObservable()
        .withLatestFrom(
            externalApi.getLatestString().toObservable(),
            integerData, stringData -> new IntegerAndString(integerData, stringData)
        .subscribe(observer); 
  }

  private static class IntegerAndString {
    // data class that holds onto both
  } 
}

我认为这里的问题是 externalApi.getLatestString() 生成的 Observable 被调用时的值冻结了。因此,无论从 Maybe<String> 返回的原始值是什么,它都会继续发送 IntegerAndString

有没有一种方法可以编写一个 Observable,在调用时从 Maybe 或其他我可以在这里使用的模式中延迟获取数据?

Maybe<T> 根据定义最多发出一个元素。

我想你基本上是想重复调用 externalApi.getLatestString()。我还没有测试过这个,但你可以在这里尝试使用 Observable.defer(...)

      public Disposable subscribe(Consumer<? super IntegerAndString> observer) {
        return internalApi
            .getIntegerObservable()
            .withLatestFrom( Observable.defer(
                () -> externalApi.getLatestString().toObservable() ),
                integerData, stringData -> new IntegerAndString(integerData, stringData)
            .subscribe(observer));
      }