使用 RxJava 和 Okhttp
Using RxJava and Okhttp
我想在另一个线程(如 IO 线程)中使用 okhttp 请求 url 并在 Android 主线程中获取 Response
,但我不知道如何创建一个 Observable
.
首先将 RxAndroid
添加到您的依赖项,然后像这样创建您的 Observable
:
Subscription subscription = Observable.create(new Observable.OnSubscribe<Response>() {
OkHttpClient client = new OkHttpClient();
@Override
public void call(Subscriber<? super Response> subscriber) {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
if (response.isSuccessful()) {
if(!subscriber.isUnsubscribed()){
subscriber.onNext(response);
}
subscriber.onCompleted();
} else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
subscriber.onError(new Exception("error"));
}
} catch (IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Response>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Response response) {
}
});
它将在另一个线程(io 线程)中请求您的 url,并在 android 主线程中观察它。
最后,当您离开屏幕时使用 subsribtion.unsubscribe()
以避免内存泄漏。
当你使用Observable.create
时,你应该写很多样板代码,而且你必须自己处理订阅。更好的选择是使用 defer。
形成文档:
do not create the Observable until the observer subscribes, and create
a fresh Observable for each observer
The Defer operator waits until an observer subscribes to it, and then
it generates an Observable, typically with an Observable factory
function. It does this afresh for each subscriber, so although each
subscriber may think it is subscribing to the same Observable, in fact
each subscriber gets its own individual sequence.
所以正如提到的,你只需要这样做:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
使用 Observable.defer()
比 Observable.create()
更容易和更安全:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
这样可以为您处理退订和背压。 a great post by Dan Lew 关于 create()
和 defer()
。
如果您想走 Observable.create()
路线,那么它应该看起来更像 this library 中到处都是 isUnsubscribed()
调用。而且我相信这仍然不能处理背压。
我意识到这个 post 有点旧,但现在有一种新的更方便的方法
Observable.fromCallable {
client.newCall(Request.Builder().url("your url").build()).execute()
}
更多信息:https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
我来晚了,但是,如果出于某种原因代码需要流式传输响应正文,那么 defer
或 fromCallable
不会这样做。相反,我们可以使用 using
运算符。
Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
response -> { // 2
...
return Single.just((Consumer<OutputStream>) fileOutput -> {
try (InputStream upstreamResponseStream = response.body().byteStream();
OutputStream fileOutput = responseBodyOutput) {
ByteStreams.copy(upstreamResponseStream, output);
}
});
},
Response::close, // 3
false) // 4
.subscribeOn(Schedulers.io()) // 5
.subscribe(copier -> copier.accept(...), // 6
throwable -> ...); // 7
- 第一个 lambda 在订阅后执行响应。
- 第二个 lambda 创建可观察类型,这里是
Single.just(...)
- 第三个 lambda 处理响应。使用
defer
可以使用 try-with-resources 样式。
- 将
eager
开关设置为 false
以使处理程序在终端事件之后调用,即在执行订阅消费者之后。
- 当然让事情发生在另一个线程池上
- 这是将使用响应主体的 lambda。如果
eager
设置为 false
,代码将引发 IOException,原因是 'closed',因为在进入此 lambda 之前响应已经关闭。
onError
lambda 应该处理异常,尤其是 IOException
不能再被 using
运算符捕获,因为 try/catch 和 defer
.
带有 RxSingle 背景的 Okhttp3 API 调用。
Disposable disposables = Single.fromCallable(() -> {
Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
OkHttpClient client = Util.getHttpClient();
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + Util.getUserToken())
.url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
.build();
Response response = client.newCall(request).execute();
if(response.isSuccessful()) {
...
return ; // Any type
} else {
return ; // Any type
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((result) -> {
Log.d(TAG, "api() completed");
});
compositeDisposable.add(disposables);
我想在另一个线程(如 IO 线程)中使用 okhttp 请求 url 并在 Android 主线程中获取 Response
,但我不知道如何创建一个 Observable
.
首先将 RxAndroid
添加到您的依赖项,然后像这样创建您的 Observable
:
Subscription subscription = Observable.create(new Observable.OnSubscribe<Response>() {
OkHttpClient client = new OkHttpClient();
@Override
public void call(Subscriber<? super Response> subscriber) {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
if (response.isSuccessful()) {
if(!subscriber.isUnsubscribed()){
subscriber.onNext(response);
}
subscriber.onCompleted();
} else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
subscriber.onError(new Exception("error"));
}
} catch (IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Response>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Response response) {
}
});
它将在另一个线程(io 线程)中请求您的 url,并在 android 主线程中观察它。
最后,当您离开屏幕时使用 subsribtion.unsubscribe()
以避免内存泄漏。
当你使用Observable.create
时,你应该写很多样板代码,而且你必须自己处理订阅。更好的选择是使用 defer。
形成文档:
do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.
所以正如
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
使用 Observable.defer()
比 Observable.create()
更容易和更安全:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
这样可以为您处理退订和背压。 a great post by Dan Lew 关于 create()
和 defer()
。
如果您想走 Observable.create()
路线,那么它应该看起来更像 this library 中到处都是 isUnsubscribed()
调用。而且我相信这仍然不能处理背压。
我意识到这个 post 有点旧,但现在有一种新的更方便的方法
Observable.fromCallable {
client.newCall(Request.Builder().url("your url").build()).execute()
}
更多信息:https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
我来晚了,但是,如果出于某种原因代码需要流式传输响应正文,那么 defer
或 fromCallable
不会这样做。相反,我们可以使用 using
运算符。
Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
response -> { // 2
...
return Single.just((Consumer<OutputStream>) fileOutput -> {
try (InputStream upstreamResponseStream = response.body().byteStream();
OutputStream fileOutput = responseBodyOutput) {
ByteStreams.copy(upstreamResponseStream, output);
}
});
},
Response::close, // 3
false) // 4
.subscribeOn(Schedulers.io()) // 5
.subscribe(copier -> copier.accept(...), // 6
throwable -> ...); // 7
- 第一个 lambda 在订阅后执行响应。
- 第二个 lambda 创建可观察类型,这里是
Single.just(...)
- 第三个 lambda 处理响应。使用
defer
可以使用 try-with-resources 样式。 - 将
eager
开关设置为false
以使处理程序在终端事件之后调用,即在执行订阅消费者之后。 - 当然让事情发生在另一个线程池上
- 这是将使用响应主体的 lambda。如果
eager
设置为false
,代码将引发 IOException,原因是 'closed',因为在进入此 lambda 之前响应已经关闭。 onError
lambda 应该处理异常,尤其是IOException
不能再被using
运算符捕获,因为 try/catch 和defer
.
带有 RxSingle 背景的 Okhttp3 API 调用。
Disposable disposables = Single.fromCallable(() -> {
Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
OkHttpClient client = Util.getHttpClient();
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + Util.getUserToken())
.url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
.build();
Response response = client.newCall(request).execute();
if(response.isSuccessful()) {
...
return ; // Any type
} else {
return ; // Any type
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((result) -> {
Log.d(TAG, "api() completed");
});
compositeDisposable.add(disposables);