将 observable 列表压缩到另一个 Zip observable RxJava2
Zip list of observables into another Zip observable RxJava2
我正在尝试 zip
zip
Observables
的列表,但问题是我每次都只能从压缩的可观察对象中获得相同的值。我这样做的原因是从 ble 执行两个操作 1st reading index
和 2nd reading data
一定次数 - 在下面的示例中是 6 次。
不知道如何用 RxJava2
处理这个问题
这是代码片段
private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){
final PublishSubject<Boolean> unsubscribeSubject = PublishSubject.create();
return Observable.zip(
rxBleConnection.setupIndication(Data.INDEX,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
rxBleConnection.setupIndication(Data.DATA,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
(bytes, bytes2) -> {
unsubscribeSubject.onNext(true);
return Pair.create(bytes,bytes2);
}
);
}
从我的主流开始,我首先创建 Observables
的列表并将其压缩并传递给它
.flatMap(rxBleConnection -> {
List<Observable<Pair<byte[],byte[]>>> observableList = new ArrayList<>();
for(int i=0;i<6;i++){
//Creating list of observables so that 6 times this function gets fire
observableList.add(getValueFromIndication(rxBleConnection));
}
// Zipping Zipped list of observables
return Observable.zip(observableList,Data::OperationReadings);
}).subscribe(bytes->{
})
在这里,我总是在 Data::OperationReadings
中得到相同的值。目前,我得到了以下我不想要的数据。
每次相同的索引和值
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
预期数据如下
每次不同的索引和值
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [2] DATA [11,11,2,0,3,0]
INDEX [3] DATA [0,0,0,0,33,0]
INDEX [4] DATA [10,30,0,30,3,0]
INDEX [5] DATA [10,0,0,30,3,0]
INDEX [6] DATA [10,0,20,30,3,9]
您获得相同数据重复 6 次的原因是您同时订阅了个人 getValueFromIndication()
。实际上每个 Observable 运行 并行。您想要按顺序 运行 每个订阅。解决方案可能是替换它:
return Observable.zip(observableList,Data::OperationReadings);
与:
return Observable.concat(observableList) // we want to subscribe each Observable from list after the previous one will complete
.toList() // we want to gather all results from individual Observables from the list — this returns a Single
.toObservable() // get back to the Observable class so the types will match
.map(Data::OperationReadings); // we map it into the OperationReadings class
我正在尝试 zip
zip
Observables
的列表,但问题是我每次都只能从压缩的可观察对象中获得相同的值。我这样做的原因是从 ble 执行两个操作 1st reading index
和 2nd reading data
一定次数 - 在下面的示例中是 6 次。
不知道如何用 RxJava2
这是代码片段
private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){
final PublishSubject<Boolean> unsubscribeSubject = PublishSubject.create();
return Observable.zip(
rxBleConnection.setupIndication(Data.INDEX,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
rxBleConnection.setupIndication(Data.DATA,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
(bytes, bytes2) -> {
unsubscribeSubject.onNext(true);
return Pair.create(bytes,bytes2);
}
);
}
从我的主流开始,我首先创建 Observables
的列表并将其压缩并传递给它
.flatMap(rxBleConnection -> {
List<Observable<Pair<byte[],byte[]>>> observableList = new ArrayList<>();
for(int i=0;i<6;i++){
//Creating list of observables so that 6 times this function gets fire
observableList.add(getValueFromIndication(rxBleConnection));
}
// Zipping Zipped list of observables
return Observable.zip(observableList,Data::OperationReadings);
}).subscribe(bytes->{
})
在这里,我总是在 Data::OperationReadings
中得到相同的值。目前,我得到了以下我不想要的数据。
每次相同的索引和值
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
预期数据如下
每次不同的索引和值
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [2] DATA [11,11,2,0,3,0]
INDEX [3] DATA [0,0,0,0,33,0]
INDEX [4] DATA [10,30,0,30,3,0]
INDEX [5] DATA [10,0,0,30,3,0]
INDEX [6] DATA [10,0,20,30,3,9]
您获得相同数据重复 6 次的原因是您同时订阅了个人 getValueFromIndication()
。实际上每个 Observable 运行 并行。您想要按顺序 运行 每个订阅。解决方案可能是替换它:
return Observable.zip(observableList,Data::OperationReadings);
与:
return Observable.concat(observableList) // we want to subscribe each Observable from list after the previous one will complete
.toList() // we want to gather all results from individual Observables from the list — this returns a Single
.toObservable() // get back to the Observable class so the types will match
.map(Data::OperationReadings); // we map it into the OperationReadings class