在批量插入时获取 TemporaryFailureException
Getting TemporaryFailureException on batch insert
我正在向 CouchBase 批量插入数据,几分钟后出现此异常:
com.couchbase.client.java.error.TemporaryFailureException at
com.couchbase.client.java.CouchbaseAsyncBucket.call(CouchbaseAsyncBucket.java:445)
at
com.couchbase.client.java.CouchbaseAsyncBucket.call(CouchbaseAsyncBucket.java:426)
at rx.internal.operators.OperatorMap.onNext(OperatorMap.java:54)
at rx.observers.Subscribers.onNext(Subscribers.java:234) at
rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:222)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101) at
com.couchbase.client.core.endpoint.AbstractGenericHandler.call(AbstractGenericHandler.java:265)
at
rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
代码如下:
cluster = CouchbaseCluster.create();
Bucket bucket = cluster.openBucket();
int numBatchs = 10000;
int numDocsInBatch = 1000;
for (int j = 0; j < numBatchs; j++) {
long start = System.currentTimeMillis();
List<JsonDocument> documents = new ArrayList<>(numDocsInBatch);
for (int i = 0; i < numDocsInBatch; i++) {
String uniqueID = UUID.randomUUID().toString() + "_" + System.currentTimeMillis();
JsonObject user = JsonObject.
empty();
documents.add(JsonDocument.create(uniqueID, user));
}
Observable
.from(documents)
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return bucket.async().insert(docToInsert);
}
})
.last()
.toBlocking()
.single();
我做错了什么吗?
TemporaryFailureException
通常是因为服务器太忙,通常可以通过退避重试操作。
供参考:http://developer.couchbase.com/documentation/server/4.0/sdks/java-2.2/documents-bulk.html
我正在向 CouchBase 批量插入数据,几分钟后出现此异常:
com.couchbase.client.java.error.TemporaryFailureException at com.couchbase.client.java.CouchbaseAsyncBucket.call(CouchbaseAsyncBucket.java:445) at com.couchbase.client.java.CouchbaseAsyncBucket.call(CouchbaseAsyncBucket.java:426) at rx.internal.operators.OperatorMap.onNext(OperatorMap.java:54) at rx.observers.Subscribers.onNext(Subscribers.java:234) at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:222) at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101) at com.couchbase.client.core.endpoint.AbstractGenericHandler.call(AbstractGenericHandler.java:265) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
代码如下:
cluster = CouchbaseCluster.create();
Bucket bucket = cluster.openBucket();
int numBatchs = 10000;
int numDocsInBatch = 1000;
for (int j = 0; j < numBatchs; j++) {
long start = System.currentTimeMillis();
List<JsonDocument> documents = new ArrayList<>(numDocsInBatch);
for (int i = 0; i < numDocsInBatch; i++) {
String uniqueID = UUID.randomUUID().toString() + "_" + System.currentTimeMillis();
JsonObject user = JsonObject.
empty();
documents.add(JsonDocument.create(uniqueID, user));
}
Observable
.from(documents)
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return bucket.async().insert(docToInsert);
}
})
.last()
.toBlocking()
.single();
我做错了什么吗?
TemporaryFailureException
通常是因为服务器太忙,通常可以通过退避重试操作。
供参考:http://developer.couchbase.com/documentation/server/4.0/sdks/java-2.2/documents-bulk.html