RxJava takeWhile 似乎被忽略了。应该只在第一页执行

RxJava takeWhile seems ignored. Should only execute for the first page

我正在尝试使用 workmanager 批量上传数据库实体。工作正常,但是执行分页的服务没有按预期工作。

我想做什么:

  1. 对我需要上传的数据库条目进行分页(服务器上的最后一个时间戳比客户端上的更新)。获取页面 List
  2. 将条目映射到 api 个对象
  3. 将实体传递给另一个方法,该方法触发 API 调用一次,发送条目列表

问题:

  1. 分页似乎忽略了 takeWhile 运算符并执行直到达到最大执行限制
  2. 从未执行 uploadEntitiesFor 调用
private void pageEntityAndUpload(long lastTimestamp) {
    Disposable ax = Observable.range(0, MAX_ITERATION)
            .doOnNext(integer -> {
                logInformation.logInformation(TAG, null,
                        String.format("Paging %s for user: %s, Offset (page:%s): %s, Limit: %s",
                                resourceName, userId, integer, integer * limit, limit)
                );
            })
            .concatMap(integer -> pageSupplierFunction.page(userId, lastTimestamp, integer * limit, limit))
            .doOnNext(dbEntities -> {
                logInformation.logInformation(TAG, null,
                        String.format("Found %s: %s", resourceName, dbEntities.stream()
                                .map(EntityBase::getId)
                                .collect(Collectors.joining(",")))
                );
            })
            .takeWhile(dbEntities -> !dbEntities.isEmpty())
            .flatMapIterable(a -> a)
            .map(e -> entityToApiMapper.apply(e))
            .toList()
            //.subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.single())
            .subscribe(
                    apiEntities -> uploadEntitiesFor(apiEntities),
                    throwable -> logError.logError(TAG, throwable, String.format("Failed to read and convert %s to api objects.", resourceName))
            );
}

接口:

private PageSupplierFunction<DBT> pageSupplierFunction;
...
@FunctionalInterface
public interface PageSupplierFunction<T> {
    public Observable<List<T>> page(String userId, long lastTimestamp, int offset, int limit);
}
private Function<DBT, AT> entityToApiMapper;

页面供应商来自Room Dao,例如(它是一个内部应用程序,我需要存储每个用户的东西...):

@Query("SELECT * FROM partner " +
        "WHERE user_id = ....)" +
        "LIMIT :limit OFFSET :offset")
Observable<List<Partner>> pagePartnerSuggestionsFor(String userId, long lastTimestamp, int offset, int limit);

执行过程中,日志输出:

2021-05-18 15:41:33.684 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:0): 0, Limit: 1000
2021-05-18 15:41:34.189 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:1): 1000, Limit: 1000
2021-05-18 15:41:34.191 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:2): 2000, Limit: 1000
2021-05-18 15:41:34.206 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:3): 3000, Limit: 1000
2021-05-18 15:41:34.207 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:4): 4000, Limit: 1000
2021-05-18 15:41:34.209 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:5): 5000, Limit: 1000
2021-05-18 15:41:34.217 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:6): 6000, Limit: 1000
2021-05-18 15:41:34.219 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:7): 7000, Limit: 1000
2021-05-18 15:41:34.226 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:8): 8000, Limit: 1000
2021-05-18 15:41:34.230 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:9): 9000, Limit: 1000
2021-05-18 15:41:34.240 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:10): 10000, Limit: 1000
2021-05-18 15:41:34.247 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:11): 11000, Limit: 1000
2021-05-18 15:41:34.253 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:12): 12000, Limit: 1000
2021-05-18 15:41:34.254 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:13): 13000, Limit: 1000
2021-05-18 15:41:34.255 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:14): 14000, Limit: 1000
2021-05-18 15:41:34.258 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:15): 15000, Limit: 1000
2021-05-18 15:41:34.259 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:16): 16000, Limit: 1000
2021-05-18 15:41:34.262 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:17): 17000, Limit: 1000
2021-05-18 15:41:34.265 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:18): 18000, Limit: 1000
2021-05-18 15:41:34.267 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:19): 19000, Limit: 1000
2021-05-18 15:41:34.268 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:20): 20000, Limit: 1000
2021-05-18 15:41:34.270 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:21): 21000, Limit: 1000
2021-05-18 15:41:34.271 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:22): 22000, Limit: 1000
2021-05-18 15:41:34.276 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:23): 23000, Limit: 1000
2021-05-18 15:41:34.279 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:24): 24000, Limit: 1000
2021-05-18 15:41:34.283 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:25): 25000, Limit: 1000
2021-05-18 15:41:34.285 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:26): 26000, Limit: 1000
2021-05-18 15:41:34.287 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:27): 27000, Limit: 1000
2021-05-18 15:41:34.291 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:28): 28000, Limit: 1000
2021-05-18 15:41:34.294 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:29): 29000, Limit: 1000
2021-05-18 15:41:34.298 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:30): 30000, Limit: 1000
2021-05-18 15:41:34.307 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:31): 31000, Limit: 1000
2021-05-18 15:41:34.311 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:32): 32000, Limit: 1000
2021-05-18 15:41:34.314 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:33): 33000, Limit: 1000
2021-05-18 15:41:34.316 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:34): 34000, Limit: 1000
2021-05-18 15:41:34.316 25902-26058/... I/PartnerUploadService: Found Partner: 957690320dee4f7983070a1fb630f487
2021-05-18 15:41:34.317 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:35): 35000, Limit: 1000
2021-05-18 15:41:34.318 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:36): 36000, Limit: 1000
2021-05-18 15:41:34.319 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:37): 37000, Limit: 1000
2021-05-18 15:41:34.321 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:38): 38000, Limit: 1000
2021-05-18 15:41:34.322 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:39): 39000, Limit: 1000
2021-05-18 15:41:34.335 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:40): 40000, Limit: 1000
2021-05-18 15:41:34.337 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:41): 41000, Limit: 1000
2021-05-18 15:41:34.342 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:42): 42000, Limit: 1000
2021-05-18 15:41:34.343 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:43): 43000, Limit: 1000
2021-05-18 15:41:34.346 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:44): 44000, Limit: 1000
2021-05-18 15:41:34.359 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:45): 45000, Limit: 1000
2021-05-18 15:41:34.361 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:46): 46000, Limit: 1000
2021-05-18 15:41:34.362 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:47): 47000, Limit: 1000
2021-05-18 15:41:34.363 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:48): 48000, Limit: 1000
2021-05-18 15:41:34.369 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1, Offset (page:49): 49000, Limit: 1000

我现在只有一个合作伙伴可以上传。上传未触发,分页仍在执行,直到达到限制并且在第一页后不会停止。

您向 concatMap 提供将按顺序处理的可观察对象。仅当前一个已完成时,才会处理每个下一个可观察对象。

问题是,您的 Dao return 可观察,实际上并未完成 - 房间 table 将被观察,除非订阅者被处置。

在您的情况下,您应该只通过将 Dao 方法更改为 return Single 而不是 Obervable(并且还更改 concatMapconcatMapSingle.)


例子

我故意没有在 emitter 上调用 onComplete 来模拟你的情况。

Observable.range(0, 5)
    .concatMap { id ->
        Observable.create<Int> { emitter ->
            emitter.onNext(id)
            // emitter.onComplete()
        }
    }
    .subscribe(
        { println("Next-$it") },
        { println("Error") },
        { println("Complete") }
    )

结果:

Next-0

可以通过将 Observable 更改为 Single 来解决。

Observable.range(0, 5)
    .concatMapSingle { id ->
        Single.create<Int> { emitter ->
            emitter.onSuccess(id)
        }
    }
    .subscribe(
        { println("Next-$it") },
        { println("Error") },
        { println("Complete") }
    )

结果:

Next-0
Next-1
Next-2
Next-3
Next-4
Complete