take(n) 在 RxJava2 中的 groupBy 之后无效

take(n) doesn't have effect after groupBy in RxJava2

我正在尝试按名称对多个模型实例进行分组,然后使用 take(n) 在每组中仅获取某些项目,但不知何故 take 对 GroupedObservable 没有影响。这是代码

假设这包含一个包含 10 个项目的列表,其中 5 个的名称为“apple”,另外 5 个的名称为“pear[=23=” ]"

Observable<Item> items....

Observable<Item> groupedItems = items.groupBy(Item::name)
.flatMap(it -> it.take(2));

所以我想象 groupedItems 必须发出 2 "apples" 和 2 "pears",但它拥有所有这些。

有什么地方我弄错了吗,我需要换个方式吗?

当再次遇到相同的键时,将重新创建已取消的组。您需要确保该组没有停止,并且您必须以某种方式忽略更多项目:

source.groupBy(func)
.flatMap(group -> 
    group.publish(p -> p.take(5).mergeWith(p.ignoreElements()))
);