Rx 中的 groupBy、过滤器和内存泄漏
groupBy, filter and memory leak in Rx
根据groupBy
的文档:
Note: A GroupedObservable
will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservable
s that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like take(int)(0)
to them.
有一个 RxJava tutorial 说:
Internally, every Rx operator does 3 things
- It subscribes to the source and observes the values.
- It transforms the observed sequence according to the operator's purpose.
- It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.
让我们看一下下面的代码块,它只从 range(0, 10)
中提取偶数:
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
我的问题是:
这是否意味着 filter
运算符已经暗示订阅由 groupBy
或仅 Observable<GroupedObservable>
产生的每个组?
这种情况下会不会有内存泄漏?如果是这样,
如何正确丢弃那些组?将 filter
替换为自定义的 take(0)
后跟 return Observable.empty()
?你可能会问为什么我不直接 return take(0)
:这是因为 filter
不一定紧跟在 groupBy
之后,而是可以在链中的任何位置并涉及更复杂的条件。
您的怀疑是正确的,因为要正确处理分组的可观察对象,必须订阅每个内部可观察对象 (g
)。由于 filter
仅订阅外部可观察对象,这是一个坏主意。只需在 flatMap
中执行您需要的操作,使用 ignoreElements
过滤掉不需要的组。
Observable.range(0, 10)
.groupBy(i -> i % 2)
.flatMap(g -> {
if (g.getKey() % 2 == 0)
return g;
else
return g.ignoreElements();
})
.subscribe(System.out::println, Throwable::printStackTrace);
除了内存泄漏,当前的实现可能由于内部请求协调问题而最终完全挂起。
请注意,使用 take(0)
,可能会一直重新创建组。我会使用 ignoreElements
来降低值,没有项目到达 flatMap
并且不会一直重新创建组本身。
根据groupBy
的文档:
Note: A
GroupedObservable
will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore thoseGroupedObservable
s that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator liketake(int)(0)
to them.
有一个 RxJava tutorial 说:
Internally, every Rx operator does 3 things
- It subscribes to the source and observes the values.
- It transforms the observed sequence according to the operator's purpose.
- It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.
让我们看一下下面的代码块,它只从 range(0, 10)
中提取偶数:
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
我的问题是:
这是否意味着
filter
运算符已经暗示订阅由groupBy
或仅Observable<GroupedObservable>
产生的每个组?这种情况下会不会有内存泄漏?如果是这样,
如何正确丢弃那些组?将
filter
替换为自定义的take(0)
后跟return Observable.empty()
?你可能会问为什么我不直接 returntake(0)
:这是因为filter
不一定紧跟在groupBy
之后,而是可以在链中的任何位置并涉及更复杂的条件。
您的怀疑是正确的,因为要正确处理分组的可观察对象,必须订阅每个内部可观察对象 (g
)。由于 filter
仅订阅外部可观察对象,这是一个坏主意。只需在 flatMap
中执行您需要的操作,使用 ignoreElements
过滤掉不需要的组。
Observable.range(0, 10)
.groupBy(i -> i % 2)
.flatMap(g -> {
if (g.getKey() % 2 == 0)
return g;
else
return g.ignoreElements();
})
.subscribe(System.out::println, Throwable::printStackTrace);
除了内存泄漏,当前的实现可能由于内部请求协调问题而最终完全挂起。
请注意,使用 take(0)
,可能会一直重新创建组。我会使用 ignoreElements
来降低值,没有项目到达 flatMap
并且不会一直重新创建组本身。