使用 RxJava 和 Retrofit 实现 Room

Implement Room with RxJava and Retrofit

我正在尝试将 Room 与 RxJava 和 Retrofit 一起使用,在您推荐使用组件架构之前(在这个机会是不可能的,项目处于 50% 并且只需要继续架构清理)。

所以问题就是这个。我有一个 return 是 POJO 的 Web 服务。像这样:

{
 "success":"true",
 "message":"message",
 "data":{[
   "id":"id",
   "name":"name",
   "lname":"lname",
 ]} 
}

POJO 更复杂,但对于这个例子来说没问题。我需要这样做,因为我的视图进行查询以从房间调用数据,但是如果我的数据库中没有数据调用我的 Web 服务,我的 Web 服务的响应将转换为实体并保存在我的数据库(房间)中,然后 return 数据列表到我看来。

我正在使用清洁拱门。我很感激任何帮助。再次不尝试使用

数据布局

演示文稿

POJO API response

{
 "success":"true",
 "message":"message",
 "data":{[
   "id":"id",
   "name":"name",
   "address":"address",
   "phone":"phone",
 ]} 
}

My db entity

@Entity(tableName = "clients")
    public class clients {

    String id;
    String name;
    String address;
    String phone;
    String status;


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}

My dao for room

@Dao
public interface ClientsDao {

     @Insert(onConflict = OnConflictStrategy.REPLACE)
     void saveAll(List<Clients> clients);

     @Query("SELECT * FROM Clients")
     Flowable<List<Clients>> listClients();

}

RxJava help class

public class RxHelper {
private static final String TAG = RxHelper.class.getName();

@NonNull
public static <T>Observable<T> getObserbable(@NonNull final Call<T> reponse){

    return Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(final ObservableEmitter<T> emitter) throws Exception {

            reponse.enqueue(new Callback<T>() {
                @Override
                public void onResponse(Call<T> call, Response<T> response) {

                    if (!emitter.isDisposed()) {
                        emitter.onNext(response.body());
                    }
                }

                @Override
                public void onFailure(Call<T> call, Throwable t) {
                    if (!emitter.isDisposed()) {
                        emitter.onError(t);
                    }
                }
            });

        }
    });

}
}

My ClientsRepoFactory

public Observable<ResponseClients> getApiClients(){
        String token = preferences.getValue(SDConstants.token);
        return RxHelper.getObserbable(apiNetwork.getClients(token));
}

My ClientsRepo

@Override
public Observable<ResponseClients> listClients() {
    return factory.listClients();
}

我不使用 room 但熟悉 rxjava 你可以像那样设计你的存储库

你的房间界面

@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

使用时:
可能 当数据库中没有用户并且查询return没有行时,可能会完成。

Flowable 每次更新用户数据时,Flowable对象会自动emit,让你根据最新的数据更新UI

Single当数据库中没有用户且查询returns没有行时,Single会触发onError(EmptyResultSetException.class)

阅读有关 Room 和 RxJava 的更多信息 link

要实现“如果数据库中没有数据调用 Web 服务”,请创建您的存储库方法

public Single<User> getUserById(String userId){
 return  db.getUserById(userId)
              /// if there is no user in the database get data from api
             .onErrorResumeNext(api.getUserById(userId)
              .subscribeOn(Schedulers.io())
              //check your request
              .filter(statusPojo::getStatus)
               // save data to room
              .switchMap(data -> {
              //sava data to db
              return Observable.just(data)
              })
           );

}

最终从交互器调用存储库方法,将 obsrevable 传递给交互器,然后传递给演示布局

更多细节:您可以将 Api 和数据库注入您的存储库

update_Answer 对于反应式数据库 如果你想获得 UI 的最新更新,那就去做吧:

你的房间界面:

@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);

存储库:

   @Override
public Flowable<User> getUser(int id) {
    getUserFromNet(id);
         //first emit cache data in db and after request complete   emit last update from net 
        return db.getUserById(id);

 }


 private Flowable<User> getUserFromNet(int id){
      api.getUserById(userId)
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.io())
          //check your request
          .filter(statusPojo::getStatus)
           // save data to room
          .subscribe(new DisposableObserver<User>() {
                @Override
                public void onNext(User user) {
                     // save data to room
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e(e);
                }

                @Override
                public void onComplete() {


                }
            });
}

update_Answer2 对于反应式数据库和“如果数据库中没有数据调用网络服务” 根据这个 issue 最好使用 return a Flowable <List<T>>

并检查列表大小而不是 Flowable<T> 白色 swichIfEmpity 因为如果数据库中没有任何用户 Flowable<T> 就不会调用 onNext() 并且不会发出FlowableEmpity();

private Flowable<List<User>>  getUser(int id){
       return db.getUserById(id).
         /// if there is no user in the database get data from 
           .flatMp(userList-> 
           if(userList.size==0)
          api.getUserById(userId)
          .subscribeOn(Schedulers.io())
          //check your request
          .filter(statusPojo::getStatus)
           // save data to room
          .subscribe(new DisposableObserver<User>() {
                @Override
                public void onNext(User user) {
                     // save data to room
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e(e);
                }

                @Override
                public void onComplete() {


                }
            });
                return Flowable.just(data)
                );
}

Kotlin 方式 改造,分页(pagingRX androidx)和空间:

房道:

@Dao
abstract class UserDao   {

@Query("SELECT * FROM users ")
abstract fun findAll(): DataSource.Factory<Int, User>
}

存储库:

private fun getFromDB(pageSize:Int): Flowable<PagedList<User>> {
    return RxPagedListBuilder(userDao.findAll(), pageSize)
        .buildFlowable(BackpressureStrategy.LATEST)
}


private fun getApi(page: Int,pageSize: Int): Disposable {
    return api.getUserList("token", page = page,perPage = pageSize)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe { t1: List<User>?, t2: Throwable? ->
            t1?.let {
                if (it.isNotEmpty())
                    userDao.insert(it)
            }
        }
}

override fun  findAll(page: Int ,pageSize:Int ): 
Flowable<PagedList<User>> {
    return getFromDB(pageSize).doOnSubscribe { getApi(page,pageSize) }
}