RxJava:保留在抛出异常之前插入到 couchbase 数据库中的部分对象列表

RxJava : Retain partial list of objects which were inserted in the couchbase db before exception was thrown

 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert))
        .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

要求:

  1. 在某个类型的couchbase中批量插入文档。
  2. 创建另一个文档,其中包含我们在第一步中插入的文档的 ID
  3. 如果第一步在我们停止插入的某个 id 处失败,那么第二个文档应该只有那些在异常发生之前插入的 id
  4. 调用是同步的

我是 RxJava 的新手。我写了上面的反应代码,但似乎我还没有清楚地理解一些概念。 我的想法是,末尾的 forEach 将始终获取发出的项目,如果发生异常,我将捕获它,然后使用 insertedIds 列表创建第二个文档。然而,该列表总是包含所有不满足我的要求的 ID。

任何人都可以解释一下代码有什么问题以及我如何实现上述要求吗?

retry 方法将重新订阅上游 Observable

在您的情况下,这意味着订阅 couchbaseDocuments 并可能尝试重新插入已成功插入的文档。

与其再次重试整个流,不如重试失败的插入:

 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

本质上:你必须移动一个括号。