使用 Rx 更新金额
Update amount using Rx
我有 2 个 Subject
(plusOne[itemId], minusOne[itemId]) 发出项目 ID,其数量应该改变一个(添加或减去)。
用户可以使用相同的项目 ID 发送多个信号。最后,我想订阅一个 Observable
会发出 Pair[itemId,amountToSet]
UI 看起来像一个项目行列表,每个行包含触发相应 onNext(itemId) 的“+”和“-”按钮。
我曾尝试通过对 itemid 进行分组并减少顺序来解决问题,但问题是 reduce
仅在调用 onCompleted
时才会启动,而我的主题不应该完成排放。
这是无法使用 reduce 的代码:
plusItem.asObservable().groupBy(id -> id).subscribe(new Action1<GroupedObservable<Long, Long>>() {
@Override
public void call(GroupedObservable<Long, Long> groupedObservable) {
System.out.println("Composed a group with key: " + groupedObservable.getKey());
groupedObservable
.map(id -> 1)
.startWith(0)
.reduce((integer, integer2) -> integer + integer2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer sum) {
System.out.println(Integer.toString(sum));
}
});
}
});
解决方案
private Observable<Pair<Long,Integer>> getUpdateObservable(
Observable clickSource,
Observable<Map<Long,Integer>> initValues,
Func1<Long,Integer> groupMapFunc
) {
return clickSource
.asObservable()
.groupBy(id -> id)
.flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Pair<Long, Integer>>>() {
@Override
public Observable<Pair<Long, Integer>> call(GroupedObservable<Long, Long> groupedObservable) {
return groupedObservable
.map(groupMapFunc)
.scan((integer, integer2) -> integer + integer2)
.withLatestFrom(initValues, new Func2<Integer, Map<Long, Integer>, Integer>() {
@Override
public Integer call(Integer integer, Map<Long, Integer> basket) {
return basket.get(groupedObservable.getKey()) + integer;
}
})
.map(new Func1<Integer, Pair<Long, Integer>>() {
@Override
public Pair<Long, Integer> call(Integer integer) {
return new Pair<>(groupedObservable.getKey(), integer);
}
});
}
});
}
你有什么建议吗?
谢谢!
您可以使用scan
将return中间值
plusItem
.asObservable()
.groupBy(id -> id)
.subscribe(new Action1<GroupedObservable<Long, Long>>() {
@Override
public void call(GroupedObservable<Long, Long> groupedObservable) {
System.out.println("Composed a group with key: " + groupedObservable.getKey());
groupedObservable
.map(id -> 1)
.scan(0, (integer, integer2) -> integer + integer2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer sum) {
System.out.println(Integer.toString(sum));
}
});
}
});
我有 2 个 Subject
(plusOne[itemId], minusOne[itemId]) 发出项目 ID,其数量应该改变一个(添加或减去)。
用户可以使用相同的项目 ID 发送多个信号。最后,我想订阅一个 Observable
会发出 Pair[itemId,amountToSet]
UI 看起来像一个项目行列表,每个行包含触发相应 onNext(itemId) 的“+”和“-”按钮。
我曾尝试通过对 itemid 进行分组并减少顺序来解决问题,但问题是 reduce
仅在调用 onCompleted
时才会启动,而我的主题不应该完成排放。
这是无法使用 reduce 的代码:
plusItem.asObservable().groupBy(id -> id).subscribe(new Action1<GroupedObservable<Long, Long>>() {
@Override
public void call(GroupedObservable<Long, Long> groupedObservable) {
System.out.println("Composed a group with key: " + groupedObservable.getKey());
groupedObservable
.map(id -> 1)
.startWith(0)
.reduce((integer, integer2) -> integer + integer2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer sum) {
System.out.println(Integer.toString(sum));
}
});
}
});
解决方案
private Observable<Pair<Long,Integer>> getUpdateObservable(
Observable clickSource,
Observable<Map<Long,Integer>> initValues,
Func1<Long,Integer> groupMapFunc
) {
return clickSource
.asObservable()
.groupBy(id -> id)
.flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Pair<Long, Integer>>>() {
@Override
public Observable<Pair<Long, Integer>> call(GroupedObservable<Long, Long> groupedObservable) {
return groupedObservable
.map(groupMapFunc)
.scan((integer, integer2) -> integer + integer2)
.withLatestFrom(initValues, new Func2<Integer, Map<Long, Integer>, Integer>() {
@Override
public Integer call(Integer integer, Map<Long, Integer> basket) {
return basket.get(groupedObservable.getKey()) + integer;
}
})
.map(new Func1<Integer, Pair<Long, Integer>>() {
@Override
public Pair<Long, Integer> call(Integer integer) {
return new Pair<>(groupedObservable.getKey(), integer);
}
});
}
});
}
你有什么建议吗?
谢谢!
您可以使用scan
将return中间值
plusItem
.asObservable()
.groupBy(id -> id)
.subscribe(new Action1<GroupedObservable<Long, Long>>() {
@Override
public void call(GroupedObservable<Long, Long> groupedObservable) {
System.out.println("Composed a group with key: " + groupedObservable.getKey());
groupedObservable
.map(id -> 1)
.scan(0, (integer, integer2) -> integer + integer2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer sum) {
System.out.println(Integer.toString(sum));
}
});
}
});