RxJava takeWhile 似乎被忽略了。应该只在第一页执行
RxJava takeWhile seems ignored. Should only execute for the first page
我正在尝试使用 workmanager 批量上传数据库实体。工作正常,但是执行分页的服务没有按预期工作。
我想做什么:
- 对我需要上传的数据库条目进行分页(服务器上的最后一个时间戳比客户端上的更新)。获取页面
List
- 将条目映射到 api 个对象
- 将实体传递给另一个方法,该方法触发 API 调用一次,发送条目列表
问题:
- 分页似乎忽略了
takeWhile
运算符并执行直到达到最大执行限制
- 从未执行
uploadEntitiesFor
调用
private void pageEntityAndUpload(long lastTimestamp) {
Disposable ax = Observable.range(0, MAX_ITERATION)
.doOnNext(integer -> {
logInformation.logInformation(TAG, null,
String.format("Paging %s for user: %s, Offset (page:%s): %s, Limit: %s",
resourceName, userId, integer, integer * limit, limit)
);
})
.concatMap(integer -> pageSupplierFunction.page(userId, lastTimestamp, integer * limit, limit))
.doOnNext(dbEntities -> {
logInformation.logInformation(TAG, null,
String.format("Found %s: %s", resourceName, dbEntities.stream()
.map(EntityBase::getId)
.collect(Collectors.joining(",")))
);
})
.takeWhile(dbEntities -> !dbEntities.isEmpty())
.flatMapIterable(a -> a)
.map(e -> entityToApiMapper.apply(e))
.toList()
//.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.single())
.subscribe(
apiEntities -> uploadEntitiesFor(apiEntities),
throwable -> logError.logError(TAG, throwable, String.format("Failed to read and convert %s to api objects.", resourceName))
);
}
接口:
private PageSupplierFunction<DBT> pageSupplierFunction;
...
@FunctionalInterface
public interface PageSupplierFunction<T> {
public Observable<List<T>> page(String userId, long lastTimestamp, int offset, int limit);
}
private Function<DBT, AT> entityToApiMapper;
页面供应商来自Room Dao,例如(它是一个内部应用程序,我需要存储每个用户的东西...):
@Query("SELECT * FROM partner " +
"WHERE user_id = ....)" +
"LIMIT :limit OFFSET :offset")
Observable<List<Partner>> pagePartnerSuggestionsFor(String userId, long lastTimestamp, int offset, int limit);
执行过程中,日志输出:
2021-05-18 15:41:33.684 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:0): 0, Limit: 1000
2021-05-18 15:41:34.189 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:1): 1000, Limit: 1000
2021-05-18 15:41:34.191 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:2): 2000, Limit: 1000
2021-05-18 15:41:34.206 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:3): 3000, Limit: 1000
2021-05-18 15:41:34.207 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:4): 4000, Limit: 1000
2021-05-18 15:41:34.209 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:5): 5000, Limit: 1000
2021-05-18 15:41:34.217 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:6): 6000, Limit: 1000
2021-05-18 15:41:34.219 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:7): 7000, Limit: 1000
2021-05-18 15:41:34.226 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:8): 8000, Limit: 1000
2021-05-18 15:41:34.230 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:9): 9000, Limit: 1000
2021-05-18 15:41:34.240 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:10): 10000, Limit: 1000
2021-05-18 15:41:34.247 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:11): 11000, Limit: 1000
2021-05-18 15:41:34.253 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:12): 12000, Limit: 1000
2021-05-18 15:41:34.254 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:13): 13000, Limit: 1000
2021-05-18 15:41:34.255 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:14): 14000, Limit: 1000
2021-05-18 15:41:34.258 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:15): 15000, Limit: 1000
2021-05-18 15:41:34.259 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:16): 16000, Limit: 1000
2021-05-18 15:41:34.262 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:17): 17000, Limit: 1000
2021-05-18 15:41:34.265 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:18): 18000, Limit: 1000
2021-05-18 15:41:34.267 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:19): 19000, Limit: 1000
2021-05-18 15:41:34.268 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:20): 20000, Limit: 1000
2021-05-18 15:41:34.270 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:21): 21000, Limit: 1000
2021-05-18 15:41:34.271 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:22): 22000, Limit: 1000
2021-05-18 15:41:34.276 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:23): 23000, Limit: 1000
2021-05-18 15:41:34.279 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:24): 24000, Limit: 1000
2021-05-18 15:41:34.283 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:25): 25000, Limit: 1000
2021-05-18 15:41:34.285 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:26): 26000, Limit: 1000
2021-05-18 15:41:34.287 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:27): 27000, Limit: 1000
2021-05-18 15:41:34.291 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:28): 28000, Limit: 1000
2021-05-18 15:41:34.294 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:29): 29000, Limit: 1000
2021-05-18 15:41:34.298 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:30): 30000, Limit: 1000
2021-05-18 15:41:34.307 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:31): 31000, Limit: 1000
2021-05-18 15:41:34.311 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:32): 32000, Limit: 1000
2021-05-18 15:41:34.314 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:33): 33000, Limit: 1000
2021-05-18 15:41:34.316 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:34): 34000, Limit: 1000
2021-05-18 15:41:34.316 25902-26058/... I/PartnerUploadService: Found Partner: 957690320dee4f7983070a1fb630f487
2021-05-18 15:41:34.317 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:35): 35000, Limit: 1000
2021-05-18 15:41:34.318 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:36): 36000, Limit: 1000
2021-05-18 15:41:34.319 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:37): 37000, Limit: 1000
2021-05-18 15:41:34.321 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:38): 38000, Limit: 1000
2021-05-18 15:41:34.322 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:39): 39000, Limit: 1000
2021-05-18 15:41:34.335 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:40): 40000, Limit: 1000
2021-05-18 15:41:34.337 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:41): 41000, Limit: 1000
2021-05-18 15:41:34.342 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:42): 42000, Limit: 1000
2021-05-18 15:41:34.343 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:43): 43000, Limit: 1000
2021-05-18 15:41:34.346 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:44): 44000, Limit: 1000
2021-05-18 15:41:34.359 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:45): 45000, Limit: 1000
2021-05-18 15:41:34.361 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:46): 46000, Limit: 1000
2021-05-18 15:41:34.362 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:47): 47000, Limit: 1000
2021-05-18 15:41:34.363 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:48): 48000, Limit: 1000
2021-05-18 15:41:34.369 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:49): 49000, Limit: 1000
我现在只有一个合作伙伴可以上传。上传未触发,分页仍在执行,直到达到限制并且在第一页后不会停止。
您向 concatMap
提供将按顺序处理的可观察对象。仅当前一个已完成时,才会处理每个下一个可观察对象。
问题是,您的 Dao return 可观察,实际上并未完成 - 房间 table 将被观察,除非订阅者被处置。
在您的情况下,您应该只通过将 Dao 方法更改为 return Single
而不是 Obervable
(并且还更改 concatMap
到 concatMapSingle
.)
例子
我故意没有在 emitter
上调用 onComplete
来模拟你的情况。
Observable.range(0, 5)
.concatMap { id ->
Observable.create<Int> { emitter ->
emitter.onNext(id)
// emitter.onComplete()
}
}
.subscribe(
{ println("Next-$it") },
{ println("Error") },
{ println("Complete") }
)
结果:
Next-0
可以通过将 Observable
更改为 Single
来解决。
Observable.range(0, 5)
.concatMapSingle { id ->
Single.create<Int> { emitter ->
emitter.onSuccess(id)
}
}
.subscribe(
{ println("Next-$it") },
{ println("Error") },
{ println("Complete") }
)
结果:
Next-0
Next-1
Next-2
Next-3
Next-4
Complete
我正在尝试使用 workmanager 批量上传数据库实体。工作正常,但是执行分页的服务没有按预期工作。
我想做什么:
- 对我需要上传的数据库条目进行分页(服务器上的最后一个时间戳比客户端上的更新)。获取页面
List
- 将条目映射到 api 个对象
- 将实体传递给另一个方法,该方法触发 API 调用一次,发送条目列表
问题:
- 分页似乎忽略了
takeWhile
运算符并执行直到达到最大执行限制 - 从未执行
uploadEntitiesFor
调用
private void pageEntityAndUpload(long lastTimestamp) {
Disposable ax = Observable.range(0, MAX_ITERATION)
.doOnNext(integer -> {
logInformation.logInformation(TAG, null,
String.format("Paging %s for user: %s, Offset (page:%s): %s, Limit: %s",
resourceName, userId, integer, integer * limit, limit)
);
})
.concatMap(integer -> pageSupplierFunction.page(userId, lastTimestamp, integer * limit, limit))
.doOnNext(dbEntities -> {
logInformation.logInformation(TAG, null,
String.format("Found %s: %s", resourceName, dbEntities.stream()
.map(EntityBase::getId)
.collect(Collectors.joining(",")))
);
})
.takeWhile(dbEntities -> !dbEntities.isEmpty())
.flatMapIterable(a -> a)
.map(e -> entityToApiMapper.apply(e))
.toList()
//.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.single())
.subscribe(
apiEntities -> uploadEntitiesFor(apiEntities),
throwable -> logError.logError(TAG, throwable, String.format("Failed to read and convert %s to api objects.", resourceName))
);
}
接口:
private PageSupplierFunction<DBT> pageSupplierFunction;
...
@FunctionalInterface
public interface PageSupplierFunction<T> {
public Observable<List<T>> page(String userId, long lastTimestamp, int offset, int limit);
}
private Function<DBT, AT> entityToApiMapper;
页面供应商来自Room Dao,例如(它是一个内部应用程序,我需要存储每个用户的东西...):
@Query("SELECT * FROM partner " +
"WHERE user_id = ....)" +
"LIMIT :limit OFFSET :offset")
Observable<List<Partner>> pagePartnerSuggestionsFor(String userId, long lastTimestamp, int offset, int limit);
执行过程中,日志输出:
2021-05-18 15:41:33.684 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:0): 0, Limit: 1000
2021-05-18 15:41:34.189 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:1): 1000, Limit: 1000
2021-05-18 15:41:34.191 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:2): 2000, Limit: 1000
2021-05-18 15:41:34.206 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:3): 3000, Limit: 1000
2021-05-18 15:41:34.207 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:4): 4000, Limit: 1000
2021-05-18 15:41:34.209 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:5): 5000, Limit: 1000
2021-05-18 15:41:34.217 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:6): 6000, Limit: 1000
2021-05-18 15:41:34.219 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:7): 7000, Limit: 1000
2021-05-18 15:41:34.226 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:8): 8000, Limit: 1000
2021-05-18 15:41:34.230 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:9): 9000, Limit: 1000
2021-05-18 15:41:34.240 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:10): 10000, Limit: 1000
2021-05-18 15:41:34.247 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:11): 11000, Limit: 1000
2021-05-18 15:41:34.253 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:12): 12000, Limit: 1000
2021-05-18 15:41:34.254 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:13): 13000, Limit: 1000
2021-05-18 15:41:34.255 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:14): 14000, Limit: 1000
2021-05-18 15:41:34.258 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:15): 15000, Limit: 1000
2021-05-18 15:41:34.259 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:16): 16000, Limit: 1000
2021-05-18 15:41:34.262 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:17): 17000, Limit: 1000
2021-05-18 15:41:34.265 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:18): 18000, Limit: 1000
2021-05-18 15:41:34.267 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:19): 19000, Limit: 1000
2021-05-18 15:41:34.268 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:20): 20000, Limit: 1000
2021-05-18 15:41:34.270 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:21): 21000, Limit: 1000
2021-05-18 15:41:34.271 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:22): 22000, Limit: 1000
2021-05-18 15:41:34.276 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:23): 23000, Limit: 1000
2021-05-18 15:41:34.279 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:24): 24000, Limit: 1000
2021-05-18 15:41:34.283 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:25): 25000, Limit: 1000
2021-05-18 15:41:34.285 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:26): 26000, Limit: 1000
2021-05-18 15:41:34.287 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:27): 27000, Limit: 1000
2021-05-18 15:41:34.291 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:28): 28000, Limit: 1000
2021-05-18 15:41:34.294 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:29): 29000, Limit: 1000
2021-05-18 15:41:34.298 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:30): 30000, Limit: 1000
2021-05-18 15:41:34.307 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:31): 31000, Limit: 1000
2021-05-18 15:41:34.311 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:32): 32000, Limit: 1000
2021-05-18 15:41:34.314 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:33): 33000, Limit: 1000
2021-05-18 15:41:34.316 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:34): 34000, Limit: 1000
2021-05-18 15:41:34.316 25902-26058/... I/PartnerUploadService: Found Partner: 957690320dee4f7983070a1fb630f487
2021-05-18 15:41:34.317 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:35): 35000, Limit: 1000
2021-05-18 15:41:34.318 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:36): 36000, Limit: 1000
2021-05-18 15:41:34.319 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:37): 37000, Limit: 1000
2021-05-18 15:41:34.321 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:38): 38000, Limit: 1000
2021-05-18 15:41:34.322 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:39): 39000, Limit: 1000
2021-05-18 15:41:34.335 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:40): 40000, Limit: 1000
2021-05-18 15:41:34.337 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:41): 41000, Limit: 1000
2021-05-18 15:41:34.342 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:42): 42000, Limit: 1000
2021-05-18 15:41:34.343 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:43): 43000, Limit: 1000
2021-05-18 15:41:34.346 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:44): 44000, Limit: 1000
2021-05-18 15:41:34.359 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:45): 45000, Limit: 1000
2021-05-18 15:41:34.361 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:46): 46000, Limit: 1000
2021-05-18 15:41:34.362 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:47): 47000, Limit: 1000
2021-05-18 15:41:34.363 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:48): 48000, Limit: 1000
2021-05-18 15:41:34.369 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:49): 49000, Limit: 1000
我现在只有一个合作伙伴可以上传。上传未触发,分页仍在执行,直到达到限制并且在第一页后不会停止。
您向 concatMap
提供将按顺序处理的可观察对象。仅当前一个已完成时,才会处理每个下一个可观察对象。
问题是,您的 Dao return 可观察,实际上并未完成 - 房间 table 将被观察,除非订阅者被处置。
在您的情况下,您应该只通过将 Dao 方法更改为 return Single
而不是 Obervable
(并且还更改 concatMap
到 concatMapSingle
.)
例子
我故意没有在 emitter
上调用 onComplete
来模拟你的情况。
Observable.range(0, 5)
.concatMap { id ->
Observable.create<Int> { emitter ->
emitter.onNext(id)
// emitter.onComplete()
}
}
.subscribe(
{ println("Next-$it") },
{ println("Error") },
{ println("Complete") }
)
结果:
Next-0
可以通过将 Observable
更改为 Single
来解决。
Observable.range(0, 5)
.concatMapSingle { id ->
Single.create<Int> { emitter ->
emitter.onSuccess(id)
}
}
.subscribe(
{ println("Next-$it") },
{ println("Error") },
{ println("Complete") }
)
结果:
Next-0
Next-1
Next-2
Next-3
Next-4
Complete