使用 RxJava 和 Retrofit 实现 Room
Implement Room with RxJava and Retrofit
我正在尝试将 Room 与 RxJava 和 Retrofit 一起使用,在您推荐使用组件架构之前(在这个机会是不可能的,项目处于 50% 并且只需要继续架构清理)。
所以问题就是这个。我有一个 return 是 POJO 的 Web 服务。像这样:
{
"success":"true",
"message":"message",
"data":{[
"id":"id",
"name":"name",
"lname":"lname",
]}
}
POJO 更复杂,但对于这个例子来说没问题。我需要这样做,因为我的视图进行查询以从房间调用数据,但是如果我的数据库中没有数据调用我的 Web 服务,我的 Web 服务的响应将转换为实体并保存在我的数据库(房间)中,然后 return 数据列表到我看来。
我正在使用清洁拱门。我很感激任何帮助。再次不尝试使用
数据布局
- 数据库
- 网络
- 存储库
域
- 互动者
- 回调
演示文稿
- 主持人
- 查看
POJO API response
{
"success":"true",
"message":"message",
"data":{[
"id":"id",
"name":"name",
"address":"address",
"phone":"phone",
]}
}
My db entity
@Entity(tableName = "clients")
public class clients {
String id;
String name;
String address;
String phone;
String status;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
My dao for room
@Dao
public interface ClientsDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
void saveAll(List<Clients> clients);
@Query("SELECT * FROM Clients")
Flowable<List<Clients>> listClients();
}
RxJava help class
public class RxHelper {
private static final String TAG = RxHelper.class.getName();
@NonNull
public static <T>Observable<T> getObserbable(@NonNull final Call<T> reponse){
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(final ObservableEmitter<T> emitter) throws Exception {
reponse.enqueue(new Callback<T>() {
@Override
public void onResponse(Call<T> call, Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response.body());
}
}
@Override
public void onFailure(Call<T> call, Throwable t) {
if (!emitter.isDisposed()) {
emitter.onError(t);
}
}
});
}
});
}
}
My ClientsRepoFactory
public Observable<ResponseClients> getApiClients(){
String token = preferences.getValue(SDConstants.token);
return RxHelper.getObserbable(apiNetwork.getClients(token));
}
My ClientsRepo
@Override
public Observable<ResponseClients> listClients() {
return factory.listClients();
}
我不使用 room 但熟悉 rxjava 你可以像那样设计你的存储库
你的房间界面
@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);
使用时:
可能 当数据库中没有用户并且查询return没有行时,可能会完成。
Flowable 每次更新用户数据时,Flowable对象会自动emit,让你根据最新的数据更新UI
Single当数据库中没有用户且查询returns没有行时,Single会触发onError(EmptyResultSetException.class)
阅读有关 Room 和 RxJava 的更多信息 link
要实现“如果数据库中没有数据调用 Web 服务”,请创建您的存储库方法
public Single<User> getUserById(String userId){
return db.getUserById(userId)
/// if there is no user in the database get data from api
.onErrorResumeNext(api.getUserById(userId)
.subscribeOn(Schedulers.io())
//check your request
.filter(statusPojo::getStatus)
// save data to room
.switchMap(data -> {
//sava data to db
return Observable.just(data)
})
);
}
最终从交互器调用存储库方法,将 obsrevable 传递给交互器,然后传递给演示布局
更多细节:您可以将 Api 和数据库注入您的存储库
update_Answer 对于反应式数据库
如果你想获得 UI 的最新更新,那就去做吧:
你的房间界面:
@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);
存储库:
@Override
public Flowable<User> getUser(int id) {
getUserFromNet(id);
//first emit cache data in db and after request complete emit last update from net
return db.getUserById(id);
}
private Flowable<User> getUserFromNet(int id){
api.getUserById(userId)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
//check your request
.filter(statusPojo::getStatus)
// save data to room
.subscribe(new DisposableObserver<User>() {
@Override
public void onNext(User user) {
// save data to room
}
@Override
public void onError(Throwable e) {
Timber.e(e);
}
@Override
public void onComplete() {
}
});
}
update_Answer2 对于反应式数据库和“如果数据库中没有数据调用网络服务”
根据这个 issue 最好使用 return a Flowable <List<T>>
并检查列表大小而不是 Flowable<T>
白色 swichIfEmpity
因为如果数据库中没有任何用户 Flowable<T>
就不会调用 onNext()
并且不会发出FlowableEmpity();
private Flowable<List<User>> getUser(int id){
return db.getUserById(id).
/// if there is no user in the database get data from
.flatMp(userList->
if(userList.size==0)
api.getUserById(userId)
.subscribeOn(Schedulers.io())
//check your request
.filter(statusPojo::getStatus)
// save data to room
.subscribe(new DisposableObserver<User>() {
@Override
public void onNext(User user) {
// save data to room
}
@Override
public void onError(Throwable e) {
Timber.e(e);
}
@Override
public void onComplete() {
}
});
return Flowable.just(data)
);
}
Kotlin 方式 改造,分页(pagingRX androidx)和空间:
房道:
@Dao
abstract class UserDao {
@Query("SELECT * FROM users ")
abstract fun findAll(): DataSource.Factory<Int, User>
}
存储库:
private fun getFromDB(pageSize:Int): Flowable<PagedList<User>> {
return RxPagedListBuilder(userDao.findAll(), pageSize)
.buildFlowable(BackpressureStrategy.LATEST)
}
private fun getApi(page: Int,pageSize: Int): Disposable {
return api.getUserList("token", page = page,perPage = pageSize)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe { t1: List<User>?, t2: Throwable? ->
t1?.let {
if (it.isNotEmpty())
userDao.insert(it)
}
}
}
override fun findAll(page: Int ,pageSize:Int ):
Flowable<PagedList<User>> {
return getFromDB(pageSize).doOnSubscribe { getApi(page,pageSize) }
}
我正在尝试将 Room 与 RxJava 和 Retrofit 一起使用,在您推荐使用组件架构之前(在这个机会是不可能的,项目处于 50% 并且只需要继续架构清理)。
所以问题就是这个。我有一个 return 是 POJO 的 Web 服务。像这样:
{
"success":"true",
"message":"message",
"data":{[
"id":"id",
"name":"name",
"lname":"lname",
]}
}
POJO 更复杂,但对于这个例子来说没问题。我需要这样做,因为我的视图进行查询以从房间调用数据,但是如果我的数据库中没有数据调用我的 Web 服务,我的 Web 服务的响应将转换为实体并保存在我的数据库(房间)中,然后 return 数据列表到我看来。
我正在使用清洁拱门。我很感激任何帮助。再次不尝试使用
数据布局
- 数据库
- 网络
- 存储库
域
- 互动者
- 回调
演示文稿
- 主持人
- 查看
POJO API response
{
"success":"true",
"message":"message",
"data":{[
"id":"id",
"name":"name",
"address":"address",
"phone":"phone",
]}
}
My db entity
@Entity(tableName = "clients")
public class clients {
String id;
String name;
String address;
String phone;
String status;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
My dao for room
@Dao
public interface ClientsDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
void saveAll(List<Clients> clients);
@Query("SELECT * FROM Clients")
Flowable<List<Clients>> listClients();
}
RxJava help class
public class RxHelper {
private static final String TAG = RxHelper.class.getName();
@NonNull
public static <T>Observable<T> getObserbable(@NonNull final Call<T> reponse){
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(final ObservableEmitter<T> emitter) throws Exception {
reponse.enqueue(new Callback<T>() {
@Override
public void onResponse(Call<T> call, Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response.body());
}
}
@Override
public void onFailure(Call<T> call, Throwable t) {
if (!emitter.isDisposed()) {
emitter.onError(t);
}
}
});
}
});
}
}
My ClientsRepoFactory
public Observable<ResponseClients> getApiClients(){
String token = preferences.getValue(SDConstants.token);
return RxHelper.getObserbable(apiNetwork.getClients(token));
}
My ClientsRepo
@Override
public Observable<ResponseClients> listClients() {
return factory.listClients();
}
我不使用 room 但熟悉 rxjava 你可以像那样设计你的存储库
你的房间界面
@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);
使用时:
可能 当数据库中没有用户并且查询return没有行时,可能会完成。
Flowable 每次更新用户数据时,Flowable对象会自动emit,让你根据最新的数据更新UI
Single当数据库中没有用户且查询returns没有行时,Single会触发onError(EmptyResultSetException.class)
阅读有关 Room 和 RxJava 的更多信息 link
要实现“如果数据库中没有数据调用 Web 服务”,请创建您的存储库方法
public Single<User> getUserById(String userId){
return db.getUserById(userId)
/// if there is no user in the database get data from api
.onErrorResumeNext(api.getUserById(userId)
.subscribeOn(Schedulers.io())
//check your request
.filter(statusPojo::getStatus)
// save data to room
.switchMap(data -> {
//sava data to db
return Observable.just(data)
})
);
}
最终从交互器调用存储库方法,将 obsrevable 传递给交互器,然后传递给演示布局
更多细节:您可以将 Api 和数据库注入您的存储库
update_Answer 对于反应式数据库 如果你想获得 UI 的最新更新,那就去做吧:
你的房间界面:
@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);
存储库:
@Override
public Flowable<User> getUser(int id) {
getUserFromNet(id);
//first emit cache data in db and after request complete emit last update from net
return db.getUserById(id);
}
private Flowable<User> getUserFromNet(int id){
api.getUserById(userId)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
//check your request
.filter(statusPojo::getStatus)
// save data to room
.subscribe(new DisposableObserver<User>() {
@Override
public void onNext(User user) {
// save data to room
}
@Override
public void onError(Throwable e) {
Timber.e(e);
}
@Override
public void onComplete() {
}
});
}
update_Answer2 对于反应式数据库和“如果数据库中没有数据调用网络服务”
根据这个 issue 最好使用 return a Flowable <List<T>>
并检查列表大小而不是 Flowable<T>
白色 swichIfEmpity
因为如果数据库中没有任何用户 Flowable<T>
就不会调用 onNext()
并且不会发出FlowableEmpity();
private Flowable<List<User>> getUser(int id){
return db.getUserById(id).
/// if there is no user in the database get data from
.flatMp(userList->
if(userList.size==0)
api.getUserById(userId)
.subscribeOn(Schedulers.io())
//check your request
.filter(statusPojo::getStatus)
// save data to room
.subscribe(new DisposableObserver<User>() {
@Override
public void onNext(User user) {
// save data to room
}
@Override
public void onError(Throwable e) {
Timber.e(e);
}
@Override
public void onComplete() {
}
});
return Flowable.just(data)
);
}
Kotlin 方式 改造,分页(pagingRX androidx)和空间:
房道:
@Dao
abstract class UserDao {
@Query("SELECT * FROM users ")
abstract fun findAll(): DataSource.Factory<Int, User>
}
存储库:
private fun getFromDB(pageSize:Int): Flowable<PagedList<User>> {
return RxPagedListBuilder(userDao.findAll(), pageSize)
.buildFlowable(BackpressureStrategy.LATEST)
}
private fun getApi(page: Int,pageSize: Int): Disposable {
return api.getUserList("token", page = page,perPage = pageSize)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe { t1: List<User>?, t2: Throwable? ->
t1?.let {
if (it.isNotEmpty())
userDao.insert(it)
}
}
}
override fun findAll(page: Int ,pageSize:Int ):
Flowable<PagedList<User>> {
return getFromDB(pageSize).doOnSubscribe { getApi(page,pageSize) }
}