如何正确理解 RxJava 的 groupBy 行为?
How to correctly understand behavior of RxJava's groupBy?
总的来说,我对 RxJava 和 FP 还很陌生。我想写一个代码来加入两个 Observable
s。假设我们有两组整数:
[0..4]
键选择器作为 2
的模数,给出 (key, value) = {(0,0), (1,1), (0,2),...}
[0..9]
键选择器作为 [=20= 的模数],给出 (key, value) = {(0,0), (1,1), (2,2), (0,3), (1,4),...}
我加入他们的步骤如下:
- 按键对每个集合进行分组。第一组使用键
0
和 1
创建两个组。第二个用键 0
、1
和 2
. 创建三个组
- 对两组组进行笛卡尔积,总共给出6对组,键为:
0-0
、0-1
、0-2
、1-0
、1-1
, 1-2
.
- 只过滤那些两边有相同键的对,只留下
0-0
和1-1
.
- 在每对中,对左右组进行笛卡尔积。
下面是计算笛卡尔积的助手class:
public class Cross<TLeft, TRight, R> implements Observable.Transformer<TLeft, R> {
private Observable<TRight> _right;
private Func2<TLeft, TRight, R> _resultSelector;
public Cross(Observable<TRight> right, Func2<TLeft, TRight, R> resultSelector) {
_right = right;
_resultSelector = resultSelector;
}
@Override
public Observable<R> call(Observable<TLeft> left) {
return left.flatMap(l -> _right.map(r -> _resultSelector.call(l, r)));
}
}
这是加入的代码:
Observable.range(0, 5).groupBy(i -> i % 2)
.compose(new Cross<>(Observable.range(0, 10).groupBy(i -> i % 3), ImmutablePair::new))
.filter(pair -> pair.left.getKey().equals(pair.right.getKey()))
.flatMap(pair -> pair.left.compose(new Cross<>(pair.right, ImmutablePair::new)))
.subscribe(System.out::println);
但是,输出不正确:
(0,0)
(0,3)
(0,6)
(0,9)
(1,1)
(1,4)
(1,7)
如果我删除包含 filter
的行,将不会有任何结果。正确的输出应该是 运行 这样的:
Observable.range(0, 5)
.compose(new Cross<>(Observable.range(0, 10), ImmutablePair::new))
.filter(pair -> pair.left % 2 == pair.right % 3)
.subscribe(System.out::println);
给出:
(0,0)
(0,3)
(0,6)
(0,9)
(1,1)
(1,4)
(1,7)
(2,0)
(2,3)
(2,6)
(2,9)
(3,1)
(3,4)
(3,7)
(4,0)
(4,3)
(4,6)
(4,9)
有人可以解释这种行为吗?非常感谢。
注意:我使用 org.apache.commons.lang3.tuple.ImmutablePair
以防你想知道。
问题是此设置尝试多次订阅一个组,这是不允许的。您会看到 subscribe(System.out::println, Throwable::printStackTrace);
重载的异常,始终建议在另一个上使用它。这是固定示例,允许以另一层 ImmutablePair 为代价进行重用:
Func1<Integer, Integer> m2 = i -> i % 2;
Func1<Integer, Integer> m3 = i -> i % 3;
Observable<ImmutablePair<Integer, Observable<Integer>>> g2 =
Observable.range(0, 5).groupBy(m2).map(g -> new ImmutablePair<>(g.getKey(), g.cache()));
Observable<ImmutablePair<Integer, Observable<Integer>>> g3 =
Observable.range(0, 10).groupBy(m3).map(g -> new ImmutablePair<>(g.getKey(), g.cache()));
Observable<ImmutablePair<ImmutablePair<Integer, Observable<Integer>>, ImmutablePair<Integer, Observable<Integer>>>> x1
= g2.compose(new Cross<>(g3, ImmutablePair::new));
Observable<ImmutablePair<ImmutablePair<Integer, Observable<Integer>>, ImmutablePair<Integer, Observable<Integer>>>> x2
= x1.filter(pair -> pair.left.getKey().equals(pair.right.getKey()));
Observable<ImmutablePair<Integer, Integer>> o = x2.flatMap(pair ->
pair.left.right.compose(new Cross<>(pair.right.right, ImmutablePair::new)));
o.subscribe(System.out::println, Throwable::printStackTrace);
(我对长类型感到抱歉,如果我尝试内联它们而不是使用局部变量,Eclipse 会出现各种推理问题)
总的来说,我对 RxJava 和 FP 还很陌生。我想写一个代码来加入两个 Observable
s。假设我们有两组整数:
[0..4]
键选择器作为2
的模数,给出(key, value) = {(0,0), (1,1), (0,2),...}
[0..9]
键选择器作为 [=20= 的模数],给出(key, value) = {(0,0), (1,1), (2,2), (0,3), (1,4),...}
我加入他们的步骤如下:
- 按键对每个集合进行分组。第一组使用键
0
和1
创建两个组。第二个用键0
、1
和2
. 创建三个组
- 对两组组进行笛卡尔积,总共给出6对组,键为:
0-0
、0-1
、0-2
、1-0
、1-1
,1-2
. - 只过滤那些两边有相同键的对,只留下
0-0
和1-1
. - 在每对中,对左右组进行笛卡尔积。
下面是计算笛卡尔积的助手class:
public class Cross<TLeft, TRight, R> implements Observable.Transformer<TLeft, R> {
private Observable<TRight> _right;
private Func2<TLeft, TRight, R> _resultSelector;
public Cross(Observable<TRight> right, Func2<TLeft, TRight, R> resultSelector) {
_right = right;
_resultSelector = resultSelector;
}
@Override
public Observable<R> call(Observable<TLeft> left) {
return left.flatMap(l -> _right.map(r -> _resultSelector.call(l, r)));
}
}
这是加入的代码:
Observable.range(0, 5).groupBy(i -> i % 2)
.compose(new Cross<>(Observable.range(0, 10).groupBy(i -> i % 3), ImmutablePair::new))
.filter(pair -> pair.left.getKey().equals(pair.right.getKey()))
.flatMap(pair -> pair.left.compose(new Cross<>(pair.right, ImmutablePair::new)))
.subscribe(System.out::println);
但是,输出不正确:
(0,0)
(0,3)
(0,6)
(0,9)
(1,1)
(1,4)
(1,7)
如果我删除包含 filter
的行,将不会有任何结果。正确的输出应该是 运行 这样的:
Observable.range(0, 5)
.compose(new Cross<>(Observable.range(0, 10), ImmutablePair::new))
.filter(pair -> pair.left % 2 == pair.right % 3)
.subscribe(System.out::println);
给出:
(0,0)
(0,3)
(0,6)
(0,9)
(1,1)
(1,4)
(1,7)
(2,0)
(2,3)
(2,6)
(2,9)
(3,1)
(3,4)
(3,7)
(4,0)
(4,3)
(4,6)
(4,9)
有人可以解释这种行为吗?非常感谢。
注意:我使用 org.apache.commons.lang3.tuple.ImmutablePair
以防你想知道。
问题是此设置尝试多次订阅一个组,这是不允许的。您会看到 subscribe(System.out::println, Throwable::printStackTrace);
重载的异常,始终建议在另一个上使用它。这是固定示例,允许以另一层 ImmutablePair 为代价进行重用:
Func1<Integer, Integer> m2 = i -> i % 2;
Func1<Integer, Integer> m3 = i -> i % 3;
Observable<ImmutablePair<Integer, Observable<Integer>>> g2 =
Observable.range(0, 5).groupBy(m2).map(g -> new ImmutablePair<>(g.getKey(), g.cache()));
Observable<ImmutablePair<Integer, Observable<Integer>>> g3 =
Observable.range(0, 10).groupBy(m3).map(g -> new ImmutablePair<>(g.getKey(), g.cache()));
Observable<ImmutablePair<ImmutablePair<Integer, Observable<Integer>>, ImmutablePair<Integer, Observable<Integer>>>> x1
= g2.compose(new Cross<>(g3, ImmutablePair::new));
Observable<ImmutablePair<ImmutablePair<Integer, Observable<Integer>>, ImmutablePair<Integer, Observable<Integer>>>> x2
= x1.filter(pair -> pair.left.getKey().equals(pair.right.getKey()));
Observable<ImmutablePair<Integer, Integer>> o = x2.flatMap(pair ->
pair.left.right.compose(new Cross<>(pair.right.right, ImmutablePair::new)));
o.subscribe(System.out::println, Throwable::printStackTrace);
(我对长类型感到抱歉,如果我尝试内联它们而不是使用局部变量,Eclipse 会出现各种推理问题)