在RxJava中使用"skipWhile"结合"repeatWhen"实现服务器轮询
Using of "skipWhile" combined with "repeatWhen" in RxJava to implement server polling
我真的很喜欢 RxJava,它是一个很棒的工具,但有时很难理解它是如何工作的。
我们在 Android 项目中使用带有 RxJava 的 Retrofit,并且有以下用例:
我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作。服务器完成后,我必须交付结果。所以我已经用 RxJava 成功地完成了,这里是代码片段:
我用 "skipWhile" 和 "repeatWhen"
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
代码工作正常:
当服务器响应作业正在处理来自 "skipWhile" 链的 I return "true" 时,原始 Observable 等待 1 秒并再次执行 http 请求。
重复此过程,直到 I return "false" from "skipWhile" chain.
以下是我不明白的几点:
我在 "skipWhile" 的文档中看到,在我 return "false" 从它的 "call" 方法。所以如果它不发出任何东西,为什么 "repeatWhen" Observable 会做它的工作?它等待一秒钟,然后 运行 再次请求。谁发起的?
第二个问题是:为什么 "repeatWhen" 的 Observable 不会永远 运行ning,我的意思是为什么当我 return [=49= 时它停止重复] 来自 "skipWhile"?如果 return "false".
在 "repeatWhile" 的文档中说,最终我在我的订阅者中接到了对 "onComplete" 的调用,但从未调用过 "onComplete"。
如果我改变链接 "skipWhile" 和 "repeatWhen" 的顺序没有任何区别。这是为什么?
我知道 RxJava 是开源的,我只能阅读代码,但正如我所说 - 真的很难理解。
谢谢。
我以前没有和 repeatWhen
一起工作过,但是这个问题让我很好奇,所以我做了一些研究。
skipWhile
会发出 onError
和 onCompleted
,即使之前从未 returns true
然后。因此,每次 checkJob()
发出 onCompleted
时都会调用 repeatWhen
。这回答了问题 #1。
其余问题均基于错误的假设。您的订阅永远 运行 因为您的 repeatWhen
永远不会终止。那是因为 repeatWhen
比你想象的要复杂得多。每当从源中获取 onCompleted
时,其中的 Observable
就会发出 null
。如果你接受它并且 return onCompleted
然后它结束,否则如果你发出任何东西它会重试。由于 delay
只是发出并延迟它,它总是再次发出 null
。因此,它会不断重新订阅。
#2 的答案是,它永远 运行;您可能正在执行此代码之外的其他操作来取消订阅。对于#3,你永远不会得到 onCompleted
因为它永远不会完成。对于 #4,顺序无关紧要,因为您要无限期地重复。
现在的问题是,如何获得正确的行为?就像使用 takeUntil
而不是 skipWhile
一样简单。这样,您就可以不断重复 直到 得到您想要的结果,从而在您希望它结束时终止流。
这是一个代码示例:
Observable<Boolean> source = ...; // Something that eventually emits true
source
.repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
.takeUntil(result -> result)
.filter(result -> result)
.subscribe(
res -> System.out.println("onNext(" + res + ")"),
err -> System.out.println("onError()"),
() -> System.out.println("onCompleted()")
);
在此示例中,source
发出布尔值。我每 1 秒重复一次,直到源发出 true
。我一直服用直到 result
是 true
。我过滤掉所有 false
的通知,因此订阅者在 true
.
之前不会收到它们
我真的很喜欢 RxJava,它是一个很棒的工具,但有时很难理解它是如何工作的。 我们在 Android 项目中使用带有 RxJava 的 Retrofit,并且有以下用例:
我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作。服务器完成后,我必须交付结果。所以我已经用 RxJava 成功地完成了,这里是代码片段: 我用 "skipWhile" 和 "repeatWhen"
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
代码工作正常:
当服务器响应作业正在处理来自 "skipWhile" 链的 I return "true" 时,原始 Observable 等待 1 秒并再次执行 http 请求。 重复此过程,直到 I return "false" from "skipWhile" chain.
以下是我不明白的几点:
我在 "skipWhile" 的文档中看到,在我 return "false" 从它的 "call" 方法。所以如果它不发出任何东西,为什么 "repeatWhen" Observable 会做它的工作?它等待一秒钟,然后 运行 再次请求。谁发起的?
第二个问题是:为什么 "repeatWhen" 的 Observable 不会永远 运行ning,我的意思是为什么当我 return [=49= 时它停止重复] 来自 "skipWhile"?如果 return "false".
在 "repeatWhile" 的文档中说,最终我在我的订阅者中接到了对 "onComplete" 的调用,但从未调用过 "onComplete"。
如果我改变链接 "skipWhile" 和 "repeatWhen" 的顺序没有任何区别。这是为什么?
我知道 RxJava 是开源的,我只能阅读代码,但正如我所说 - 真的很难理解。
谢谢。
我以前没有和 repeatWhen
一起工作过,但是这个问题让我很好奇,所以我做了一些研究。
skipWhile
会发出 onError
和 onCompleted
,即使之前从未 returns true
然后。因此,每次 checkJob()
发出 onCompleted
时都会调用 repeatWhen
。这回答了问题 #1。
其余问题均基于错误的假设。您的订阅永远 运行 因为您的 repeatWhen
永远不会终止。那是因为 repeatWhen
比你想象的要复杂得多。每当从源中获取 onCompleted
时,其中的 Observable
就会发出 null
。如果你接受它并且 return onCompleted
然后它结束,否则如果你发出任何东西它会重试。由于 delay
只是发出并延迟它,它总是再次发出 null
。因此,它会不断重新订阅。
#2 的答案是,它永远 运行;您可能正在执行此代码之外的其他操作来取消订阅。对于#3,你永远不会得到 onCompleted
因为它永远不会完成。对于 #4,顺序无关紧要,因为您要无限期地重复。
现在的问题是,如何获得正确的行为?就像使用 takeUntil
而不是 skipWhile
一样简单。这样,您就可以不断重复 直到 得到您想要的结果,从而在您希望它结束时终止流。
这是一个代码示例:
Observable<Boolean> source = ...; // Something that eventually emits true
source
.repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
.takeUntil(result -> result)
.filter(result -> result)
.subscribe(
res -> System.out.println("onNext(" + res + ")"),
err -> System.out.println("onError()"),
() -> System.out.println("onCompleted()")
);
在此示例中,source
发出布尔值。我每 1 秒重复一次,直到源发出 true
。我一直服用直到 result
是 true
。我过滤掉所有 false
的通知,因此订阅者在 true
.