同步周期更新和创建、更新、删除操作
Sychronization of periodic update and create, update and delete operation
我使用 Flowable
链来定期调用用户任务的请求 (API)。
Flowable.interval(60, TimeUnit.SECONDS)
.flatMapCompletable(_tick ->
mUserRepository.getAllUsers()
.flatMapObservable(Observable::fromIterable)
.flatMap(user ->
downloadAndPersistTasks(user)
.subscribeOn(Schedulers.io())
)
.subscribeOn(Schedulers.io())
, false, 1
);
方法downloadAndPersistData
下载每个用户的当前任务,从数据库中删除所有旧任务并保存下载的任务。可以从服务器端更改用户的任务。
问题是下载时间比较长。
本案例:
- 下载任务 - 开始
- 插入任务 - API 调用 + 保存到数据库 - 开始
- 插入任务 - API 调用 + 保存到数据库 - 结束
- 下载任务 - 结束(没有插入任务的记录)
- 将下载的记录写入数据库 -> 插入的任务被 覆盖
我需要在执行插入 API 调用时禁用特定用户的数据下载,并在执行定期更新时禁用函数插入任务。
是否有任何 RxJava 解决方案或者是使用 Java 框架的本机同步原语的最佳选择?但是我不需要跳过定期更新,只是延迟它。
我假设任务插入步骤是在本地完成的,downloadAndPersistTasks()
。
让我们介绍一个类型联合:Either<L,R>
。它有静态工厂方法 Either.<L>createLeft( L value )
和 Either.<R>createRight( R value )
。它还具有 class 方法:isLeft()
、isRight()
和 getLeft()
/getRight()
来做自然的事情。
一个Subject
用于注入插入任务步骤:
PublishSubject<InsertTask> taskInserter = PublishSubject.create();
然后我们按照下面的方式组合它们:
Flowable.timer(60, SECONDS)
.flatMap( tick ->
Observable.fromIterable( mUserRepository.getAllUsers() ), 1 )
.map( t -> Either.createLeft(t) )
.mergeWith( taskInserter.map( i -> Either.createRight( i ) ), 1 )
.observeOn( scheduler )
.subscribe( ti -> {
if ( ti.isLeft() ) {
downloadAndPersistTasks( ti.getLeft() );
} else {
insertTask( ti.getRight() );
}
);
flatMap()
的第二个参数确保一次只处理一个用户,mergeWith()
运算符也是如此。合并第二个流确保一次只执行一个操作,observeOn()
运算符将所有操作放在同一个线程上,因此不会发生争用。
如果您需要更高的并行度或更细粒度的控制,您可能需要为每个用户引入一个可观察对象。
我使用 Flowable
链来定期调用用户任务的请求 (API)。
Flowable.interval(60, TimeUnit.SECONDS)
.flatMapCompletable(_tick ->
mUserRepository.getAllUsers()
.flatMapObservable(Observable::fromIterable)
.flatMap(user ->
downloadAndPersistTasks(user)
.subscribeOn(Schedulers.io())
)
.subscribeOn(Schedulers.io())
, false, 1
);
方法downloadAndPersistData
下载每个用户的当前任务,从数据库中删除所有旧任务并保存下载的任务。可以从服务器端更改用户的任务。
问题是下载时间比较长。
本案例:
- 下载任务 - 开始
- 插入任务 - API 调用 + 保存到数据库 - 开始
- 插入任务 - API 调用 + 保存到数据库 - 结束
- 下载任务 - 结束(没有插入任务的记录)
- 将下载的记录写入数据库 -> 插入的任务被 覆盖
我需要在执行插入 API 调用时禁用特定用户的数据下载,并在执行定期更新时禁用函数插入任务。
是否有任何 RxJava 解决方案或者是使用 Java 框架的本机同步原语的最佳选择?但是我不需要跳过定期更新,只是延迟它。
我假设任务插入步骤是在本地完成的,downloadAndPersistTasks()
。
让我们介绍一个类型联合:Either<L,R>
。它有静态工厂方法 Either.<L>createLeft( L value )
和 Either.<R>createRight( R value )
。它还具有 class 方法:isLeft()
、isRight()
和 getLeft()
/getRight()
来做自然的事情。
一个Subject
用于注入插入任务步骤:
PublishSubject<InsertTask> taskInserter = PublishSubject.create();
然后我们按照下面的方式组合它们:
Flowable.timer(60, SECONDS)
.flatMap( tick ->
Observable.fromIterable( mUserRepository.getAllUsers() ), 1 )
.map( t -> Either.createLeft(t) )
.mergeWith( taskInserter.map( i -> Either.createRight( i ) ), 1 )
.observeOn( scheduler )
.subscribe( ti -> {
if ( ti.isLeft() ) {
downloadAndPersistTasks( ti.getLeft() );
} else {
insertTask( ti.getRight() );
}
);
flatMap()
的第二个参数确保一次只处理一个用户,mergeWith()
运算符也是如此。合并第二个流确保一次只执行一个操作,observeOn()
运算符将所有操作放在同一个线程上,因此不会发生争用。
如果您需要更高的并行度或更细粒度的控制,您可能需要为每个用户引入一个可观察对象。