RxJava:保留在抛出异常之前插入到 couchbase 数据库中的部分对象列表
RxJava : Retain partial list of objects which were inserted in the couchbase db before exception was thrown
Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert))
.retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
要求:
- 在某个类型的couchbase中批量插入文档。
- 创建另一个文档,其中包含我们在第一步中插入的文档的 ID
- 如果第一步在我们停止插入的某个 id 处失败,那么第二个文档应该只有那些在异常发生之前插入的 id
- 调用是同步的
我是 RxJava 的新手。我写了上面的反应代码,但似乎我还没有清楚地理解一些概念。
我的想法是,末尾的 forEach 将始终获取发出的项目,如果发生异常,我将捕获它,然后使用 insertedIds 列表创建第二个文档。然而,该列表总是包含所有不满足我的要求的 ID。
任何人都可以解释一下代码有什么问题以及我如何实现上述要求吗?
retry
方法将重新订阅上游 Observable
。
在您的情况下,这意味着订阅 couchbaseDocuments
并可能尝试重新插入已成功插入的文档。
与其再次重试整个流,不如重试失败的插入:
Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
本质上:你必须移动一个括号。
Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert))
.retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
要求:
- 在某个类型的couchbase中批量插入文档。
- 创建另一个文档,其中包含我们在第一步中插入的文档的 ID
- 如果第一步在我们停止插入的某个 id 处失败,那么第二个文档应该只有那些在异常发生之前插入的 id
- 调用是同步的
我是 RxJava 的新手。我写了上面的反应代码,但似乎我还没有清楚地理解一些概念。 我的想法是,末尾的 forEach 将始终获取发出的项目,如果发生异常,我将捕获它,然后使用 insertedIds 列表创建第二个文档。然而,该列表总是包含所有不满足我的要求的 ID。
任何人都可以解释一下代码有什么问题以及我如何实现上述要求吗?
retry
方法将重新订阅上游 Observable
。
在您的情况下,这意味着订阅 couchbaseDocuments
并可能尝试重新插入已成功插入的文档。
与其再次重试整个流,不如重试失败的插入:
Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
本质上:你必须移动一个括号。