Rx 中的 groupBy、过滤器和内存泄漏

groupBy, filter and memory leak in Rx

根据groupBy的文档:

Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like take(int)(0) to them.

有一个 RxJava tutorial 说:

Internally, every Rx operator does 3 things

  1. It subscribes to the source and observes the values.
  2. It transforms the observed sequence according to the operator's purpose.
  3. It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.

让我们看一下下面的代码块,它只从 range(0, 10) 中提取偶数:

Observable.range(0, 10)
        .groupBy(i -> i % 2)
        .filter(g -> g.getKey() % 2 == 0)
        .flatMap(g -> g)
        .subscribe(System.out::println, Throwable::printStackTrace);

我的问题是:

  1. 这是否意味着 filter 运算符已经暗示订阅由 groupBy 或仅 Observable<GroupedObservable> 产生的每个组?

  2. 这种情况下会不会有内存泄漏?如果是这样,

  3. 如何正确丢弃那些组?将 filter 替换为自定义的 take(0) 后跟 return Observable.empty()?你可能会问为什么我不直接 return take(0):这是因为 filter 不一定紧跟在 groupBy 之后,而是可以在链中的任何位置并涉及更复杂的条件。

您的怀疑是正确的,因为要正确处理分组的可观察对象,必须订阅每个内部可观察对象 (g)。由于 filter 仅订阅外部可观察对象,这是一个坏主意。只需在 flatMap 中执行您需要的操作,使用 ignoreElements 过滤掉不需要的组。

Observable.range(0, 10)
    .groupBy(i -> i % 2)
    .flatMap(g -> {
       if (g.getKey() % 2 == 0) 
         return g;
       else 
         return g.ignoreElements();
    })
    .subscribe(System.out::println, Throwable::printStackTrace);

除了内存泄漏,当前的实现可能由于内部请求协调问题而最终完全挂起。

请注意,使用 take(0),可能会一直重新创建组。我会使用 ignoreElements 来降低值,没有项目到达 flatMap 并且不会一直重新创建组本身。