检查一个特定事件后跟另一个事件并使用 RxJava 发出成功
Check one specific event is followed by another one and emit success using RxJava
我需要检查一个无限可观察对象(来自设备的事件)是否发出一个特定的事件,我们称它为“开始”,然后是另一个“完成”。然而,在这两个事件之间,可以接收任意数量的不同事件,并且必须忽略它们。结果应该是 Completable.complete(),当“Started”事件后跟“Finished”事件之后在设置的超时前成功时,它是成功的。
对于这个问题我有一个有效的解决方案,但是它看起来很难看而且太复杂,我认为可能有更多 elegant/simple 的解决方案。我当前的代码看起来像这样,我已经概括了我的代码以便更容易理解,基本上在这个例子中我检查在 Flowable 发出数字“5”之后,在 10 秒超时之前接收到数字“8”:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
return events
.filter(number -> number == 5)
.firstElement()
.concatMapCompletable(number -> {
if (number == 5) {
return events
.filter(number2 -> number2 == 8)
.firstElement()
.concatMapCompletable(number2 -> {
if (number2 == 8) {
return Completable.complete();
} else {
return Completable.error(new Exception("Number 3 expected, got " + number2));
}
});
} else {
return Completable.error(new Exception("Number 2 expected, got " + number));
}
})
.timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")));
编辑:
我找到了一个更干净的版本,但是它看起来很奇怪,因为我使用 .filter 运算符然后在收到的第一个元素上完成,我 post 在下面作为参考:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
TestObserver testObserver = events
.filter(number -> number == 5)
.firstElement()
.concatMapCompletable(number ->
events
.filter(number2 -> number2 == 8)
.firstElement()
.concatMapCompletable(number2 ->
Completable.complete()))
.timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")))
.test();
更新 2:
我比较满意的版本:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
TestObserver testObserver = events
.skipWhile(number -> number != 5)
.firstElement()
.flatMapCompletable(number -> Completable.fromObservable(events
.takeUntil(number2 -> number2 == 8)
.toObservable()
));
我不确定你到底想做什么,但你可以使用 buffer
或 window
运算符,如下所示:
Flowable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.filter(e -> e.size() > 1)
.flatMapCompletable(e -> {
int first = e.get(0);
int second = e.get(1);
if (first == 2) {
if (second == 3) {
return Completable.complete();
} else {
return Completable.error(new Exception("..."));
}
}
return Completable.fromObservable(Observable.just(e));
})
更新
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
.share();
source
.skipWhile(e -> e != 5)
.flatMapCompletable(e -> Completable.fromObservable(source
.takeUntil(x -> x == 8)
.timeout(10, TimeUnit.SECONDS)))
.subscribe();
我需要检查一个无限可观察对象(来自设备的事件)是否发出一个特定的事件,我们称它为“开始”,然后是另一个“完成”。然而,在这两个事件之间,可以接收任意数量的不同事件,并且必须忽略它们。结果应该是 Completable.complete(),当“Started”事件后跟“Finished”事件之后在设置的超时前成功时,它是成功的。
对于这个问题我有一个有效的解决方案,但是它看起来很难看而且太复杂,我认为可能有更多 elegant/simple 的解决方案。我当前的代码看起来像这样,我已经概括了我的代码以便更容易理解,基本上在这个例子中我检查在 Flowable 发出数字“5”之后,在 10 秒超时之前接收到数字“8”:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
return events
.filter(number -> number == 5)
.firstElement()
.concatMapCompletable(number -> {
if (number == 5) {
return events
.filter(number2 -> number2 == 8)
.firstElement()
.concatMapCompletable(number2 -> {
if (number2 == 8) {
return Completable.complete();
} else {
return Completable.error(new Exception("Number 3 expected, got " + number2));
}
});
} else {
return Completable.error(new Exception("Number 2 expected, got " + number));
}
})
.timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")));
编辑: 我找到了一个更干净的版本,但是它看起来很奇怪,因为我使用 .filter 运算符然后在收到的第一个元素上完成,我 post 在下面作为参考:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
TestObserver testObserver = events
.filter(number -> number == 5)
.firstElement()
.concatMapCompletable(number ->
events
.filter(number2 -> number2 == 8)
.firstElement()
.concatMapCompletable(number2 ->
Completable.complete()))
.timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")))
.test();
更新 2: 我比较满意的版本:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
TestObserver testObserver = events
.skipWhile(number -> number != 5)
.firstElement()
.flatMapCompletable(number -> Completable.fromObservable(events
.takeUntil(number2 -> number2 == 8)
.toObservable()
));
我不确定你到底想做什么,但你可以使用 buffer
或 window
运算符,如下所示:
Flowable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.filter(e -> e.size() > 1)
.flatMapCompletable(e -> {
int first = e.get(0);
int second = e.get(1);
if (first == 2) {
if (second == 3) {
return Completable.complete();
} else {
return Completable.error(new Exception("..."));
}
}
return Completable.fromObservable(Observable.just(e));
})
更新
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
.share();
source
.skipWhile(e -> e != 5)
.flatMapCompletable(e -> Completable.fromObservable(source
.takeUntil(x -> x == 8)
.timeout(10, TimeUnit.SECONDS)))
.subscribe();