使用 OkHttp3 和 ReactiveX 实现长轮询的正确方法 Java
Correct way to implement long polling using OkHttp3 and ReactiveX Java
如何使用 OkHttp3 (v4.4.1) 实现长轮询以获取 RxJava (v2.2.11) Observable 的每一行响应?可以在不阻塞线程以保持读取行的情况下完成吗?如果我需要阻塞某个线程,那么我应该阻塞哪个线程?关于使用 OkHttp3 实现长轮询的任何一般示例? Google在这个话题上对我很害羞...
TL;DR
我正在使用 OKHttp3 作为 HTTP 客户端,并将其包装在 makeGetObservable
方法调用中,returns Observable Response 使用 newCall 回调向 Observable 发出事件。现在我正在尝试添加对长轮询服务的支持,但我担心线程。
下面的代码演示了我正在尝试做的事情(并且似乎有效),但我很确定它不正常。
// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
// check for error and map to Observable<ResponseBody>
.map(this::mapRespBodyOrError)
// flat map to Observable<String> representing line of long polling response
.flatMap(respBody -> Observable.create(emitter -> {
// open reader on response body stream
try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
String line;
// block and wait to read a line from input
while((line = reader.readLine()) !=null) {
// once line was read from response body input stream emit it as observable event
emitter.onNext(line);
}
}
}));
经过一番研究,我发现被阻塞的线程是OkHttp客户端线程。这是由于我实现了 makeGetObservable
,它从 OkHttp 客户端的 newCall..enqueue 回调发出。
默认情况下,OkHttp 客户端有 5 个线程池,用于资源的 5 个并发连接。因此,每次我订阅另一个 long-polling 资源时,我都会阻塞其中一个线程。 5 个线程阻塞后,OkHttp 客户端停止工作,因为它没有线程来处理响应。
正如@Progman 所建议的那样,我使用 IO 调度程序subscribeOn
,它为每个阻塞的 IO 操作生成新线程。使用此调度程序必须小心并正确处理资源。
我的实现目前如下所示(添加了调度程序、完成和错误事件)
// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
// check for error and map to Observable<ResponseBody>
.map(this::mapRespBodyOrError)
// use IO scheduler that spawns new threads to take care of blocking operations
.observeOn(Schedulers.io())
// flat map to Observable<String> representing line of long polling response
.flatMap(respBody -> Observable.create(emitter -> {
try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
// blocking read lines while available
String line;
while ((line = reader.readLine()) != null) {
// emit event for every line
emitter.onNext(line);
}
// emit the completion event to indicate we are done
emitter.onComplete();
} catch (IOException | RuntimeException err) {
// emit any error that might have occurred
emitter.onError(err);
}
}));
如何使用 OkHttp3 (v4.4.1) 实现长轮询以获取 RxJava (v2.2.11) Observable 的每一行响应?可以在不阻塞线程以保持读取行的情况下完成吗?如果我需要阻塞某个线程,那么我应该阻塞哪个线程?关于使用 OkHttp3 实现长轮询的任何一般示例? Google在这个话题上对我很害羞...
TL;DR
我正在使用 OKHttp3 作为 HTTP 客户端,并将其包装在 makeGetObservable
方法调用中,returns Observable Response 使用 newCall 回调向 Observable 发出事件。现在我正在尝试添加对长轮询服务的支持,但我担心线程。
下面的代码演示了我正在尝试做的事情(并且似乎有效),但我很确定它不正常。
// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
// check for error and map to Observable<ResponseBody>
.map(this::mapRespBodyOrError)
// flat map to Observable<String> representing line of long polling response
.flatMap(respBody -> Observable.create(emitter -> {
// open reader on response body stream
try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
String line;
// block and wait to read a line from input
while((line = reader.readLine()) !=null) {
// once line was read from response body input stream emit it as observable event
emitter.onNext(line);
}
}
}));
经过一番研究,我发现被阻塞的线程是OkHttp客户端线程。这是由于我实现了 makeGetObservable
,它从 OkHttp 客户端的 newCall..enqueue 回调发出。
默认情况下,OkHttp 客户端有 5 个线程池,用于资源的 5 个并发连接。因此,每次我订阅另一个 long-polling 资源时,我都会阻塞其中一个线程。 5 个线程阻塞后,OkHttp 客户端停止工作,因为它没有线程来处理响应。
正如@Progman 所建议的那样,我使用 IO 调度程序subscribeOn
,它为每个阻塞的 IO 操作生成新线程。使用此调度程序必须小心并正确处理资源。
我的实现目前如下所示(添加了调度程序、完成和错误事件)
// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
// check for error and map to Observable<ResponseBody>
.map(this::mapRespBodyOrError)
// use IO scheduler that spawns new threads to take care of blocking operations
.observeOn(Schedulers.io())
// flat map to Observable<String> representing line of long polling response
.flatMap(respBody -> Observable.create(emitter -> {
try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
// blocking read lines while available
String line;
while ((line = reader.readLine()) != null) {
// emit event for every line
emitter.onNext(line);
}
// emit the completion event to indicate we are done
emitter.onComplete();
} catch (IOException | RuntimeException err) {
// emit any error that might have occurred
emitter.onError(err);
}
}));