Android Retrofit 2 + RxJava:听不完的流
Android Retrofit 2 + RxJava: listen to endless stream
可以用Retrofit + RxJava听个不停吗?例如 Twitter 流。我有的是:
public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}
MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);
api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));
但是 "onComplete" 在第一个对象被解析后被调用。有没有办法让 Retrofit 保持开放状态,直至另行通知?
这是我的解决方案:
您可以使用@Streaming 注释:
public interface ITwitterAPI {
@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);
使用 @Streaming
我们可以从 ResponseBody
获得原始输入。
这里是我的函数,用于将正文按事件行分隔:
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
结果用法:
api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);
关于优雅停止的更新
当我们取消订阅时,改造会关闭输入流。但是不可能从输入流本身检测到输入流是否关闭,所以唯一的方法 - 尝试从流中读取 - 我们会收到 Socket closed
消息的异常。
我们可以将此异常解释为结束:
@Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}
如果连接因为响应结束而关闭,我们没有任何例外。在这里 isCompleted
检查一下。如果我错了请告诉我:)
Zella's 适用于带有 rxJava 的 Retrofit2,
对于 rxJava2,我像这样修改了自定义可观察对象:
//imports
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import okio.BufferedSource
import java.io.IOException
fun events(source: BufferedSource): Observable<String> {
return Observable.create { emitter ->
var isCompleted = false
try {
while (!source.exhausted()) {
emitter.onNext(source.readUtf8Line()!!)
}
emitter.onComplete()
} catch (e: IOException) {
e.printStackTrace()
if (e.message == "Socket closed") {
isCompleted = true
emitter.onComplete()
} else {
throw IOException(e)
}
}
if (!isCompleted) {
emitter.onComplete()
}
}
}
模块级别的更改build.gradle 依赖项:
//retrofit rxJava2 adapter
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.1'
//rx-java
implementation 'io.reactivex.rxjava2:rxjava:2.2.11'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
改造适配器更改:
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build().create(ITwitterAPI.class);
并将流媒体 API 称为
api.twitterStream()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap { responseBody-> events(responseBody.source()) }
.subscribe({ t ->
Log.i(TAG, "onNext t=$t")
}, { e ->
Log.i(TAG, "onError e=$e")
}, {
Log.i(TAG, "onFinish")
})
可以用Retrofit + RxJava听个不停吗?例如 Twitter 流。我有的是:
public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}
MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);
api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));
但是 "onComplete" 在第一个对象被解析后被调用。有没有办法让 Retrofit 保持开放状态,直至另行通知?
这是我的解决方案:
您可以使用@Streaming 注释:
public interface ITwitterAPI {
@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);
使用 @Streaming
我们可以从 ResponseBody
获得原始输入。
这里是我的函数,用于将正文按事件行分隔:
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
结果用法:
api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);
关于优雅停止的更新
当我们取消订阅时,改造会关闭输入流。但是不可能从输入流本身检测到输入流是否关闭,所以唯一的方法 - 尝试从流中读取 - 我们会收到 Socket closed
消息的异常。
我们可以将此异常解释为结束:
@Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}
如果连接因为响应结束而关闭,我们没有任何例外。在这里 isCompleted
检查一下。如果我错了请告诉我:)
Zella's
//imports
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import okio.BufferedSource
import java.io.IOException
fun events(source: BufferedSource): Observable<String> {
return Observable.create { emitter ->
var isCompleted = false
try {
while (!source.exhausted()) {
emitter.onNext(source.readUtf8Line()!!)
}
emitter.onComplete()
} catch (e: IOException) {
e.printStackTrace()
if (e.message == "Socket closed") {
isCompleted = true
emitter.onComplete()
} else {
throw IOException(e)
}
}
if (!isCompleted) {
emitter.onComplete()
}
}
}
模块级别的更改build.gradle 依赖项:
//retrofit rxJava2 adapter
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.1'
//rx-java
implementation 'io.reactivex.rxjava2:rxjava:2.2.11'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
改造适配器更改:
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build().create(ITwitterAPI.class);
并将流媒体 API 称为
api.twitterStream()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap { responseBody-> events(responseBody.source()) }
.subscribe({ t ->
Log.i(TAG, "onNext t=$t")
}, { e ->
Log.i(TAG, "onError e=$e")
}, {
Log.i(TAG, "onFinish")
})