使用 JavaRX 将文档插入 couchbase 时抛出 DocumentAlreadyExistsException 异常时如何继续下一个文档?

How to proceed to next document when DocumentAlreadyExistsException exception is thrown while inserting documents into couchbase using JavaRX?

我正在处理读取文档、转换文档并将其写入 coucbase 的批处理作业。我正在使用 Java 批量插入文档。我不想使用 upsert,因为我想记录文档是否已存在于 couchbase 中并调用 insert。请在下面找到我的代码。当文档已经存在时,将抛出 DocumentAlreadyExistsException 并且作业将停止而不是继续处理下一个文档。如何处理这个问题?

代码:

    public void insertAll(Collection<JsonDocument> documents) {

    Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(final JsonDocument docToInsert) {
            return couchbaseConfig.catalogBucket().async().insert(docToInsert)
                    .doOnError((Throwable throwable) -> log.error(
                            "Exception {} occured while inerting document {} to cb", throwable.getMessage(),
                            docToInsert));
        }
    }).last().toBlocking().single();

}

异常:

com.couchbase.client.java.error.DocumentAlreadyExistsException: null
at com.couchbase.client.java.bucket.api.Mutate.call(Mutate.java:154) ~[java-client-2.7.4.jar:na]
at com.couchbase.client.java.bucket.api.Mutate.call(Mutate.java:132) ~[java-client-2.7.4.jar:na]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:211) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.4.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access[=11=]0(AbstractGenericHandler.java:86) ~[core-io-1.7.4.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.call(AbstractGenericHandler.java:526) ~[core-io-1.7.4.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.InsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ~[rxjava-1.3.8.jar:1.3.8]
... 21 common frames omitted

你可以使用onErrorResumeNext来抑制错误,像这样:

Observable.from(docs)
  .flatMap(docToInsert ->
    bucket.insert(docToInsert)
      .doOnError(t -> System.out.println("oops, error inserting " + docToInsert.id() + " : " + t))
      .onErrorResumeNext(t ->
        t instanceof DocumentAlreadyExistsException ? Observable.empty() : Observable.error(t))
  ).toCompletable().await();

请注意 toCompleteable().await() 即使没有发射任何物品也能正常工作; single() 如果所有文档都已存在,则会抛出异常。