在批量插入期间,只有一半的 BinaryDocument 被插入
Only half of the BinaryDocument(s) are getting inserted during bulk insert
我在插入过程中遇到了一个奇怪的问题。我有两种类型的文档 - JSON 和 BinaryDocument。我正在执行限制为批量大小的批量插入操作。
该操作适用于 JSON 个文档。但是如果我上传,比如 100 个文档,那么对于 BinaryDocument,只有 50 个被上传。每次只有一半的文档被加载到数据库中。
这是我的 JSON 文档插入代码:
public void createMultipleCustomerDocuments(String docId, Customer myCust, long numDocs, int batchSize) {
Gson gson = new GsonBuilder().create();
JsonObject content = JsonObject.fromJson(gson.toJson(myCust));
JsonDocument document = JsonDocument.create(docId, content);
jsonDocuments.add(document);
documentCounter.incrementAndGet();
System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
if(documentCounter.get() >= batchSize){
System.out.println("Document counter: " + documentCounter.get());
Observable
.from(jsonDocuments)
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return theBucket.async().upsert(docToInsert);
}
})
.last()
.toList()
.toBlocking()
.single();
jsonDocuments.clear();
documentCounter.set(0);
}
}
这完全没问题。我插入没有问题。
这是我的 BinaryDocument 插入代码:
public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, long numDocs, int batchSize) throws BackpressureException, InterruptedException {
ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
binaryDocuments.add(buffer);
documentCounter.incrementAndGet();
System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
if(documentCounter.get() >= batchSize){
System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
Observable
.from(binaryDocuments)
.flatMap(new Func1<ByteBuf, Observable<BinaryDocument>>() {
public Observable<BinaryDocument> call(final ByteBuf docToInsert) {
//docToInsert.retain();
return theBucket.async().upsert(BinaryDocument.create(docId, docToInsert));
}
})
.last()
.toList()
.toBlocking()
.single();
binaryDocuments.clear();
documentCounter.set(0);
}
}
这失败了。插入的文档数量正好是一半。甚至数字的打印方式也与 JSON 文档函数的数字完全相同。 documentCounter 显示正确的数字。但是插入数据库的文档数量只有显示的一半。
有人可以帮我吗?
您似乎使用相同的文档 ID(即批次中最后一个成员的 docId)在同一批次中创建所有文档
.BinaryDocument.create(docId, docToInsert)
您应该在 if 语句之外构建 BinaryDocument 数组(就像您对 JsonDocument 版本所做的那样)。像
public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, int batchSize) throws BackpressureException, InterruptedException {
// numDocs is redundant
ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
binaryDocuments.add(BinaryDocument.create(docId, buffer)); // ArrayList<BinaryDocument> type
documentCounter.incrementAndGet();
System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
if(documentCounter.get() >= batchSize){
System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
Observable
.from(binaryDocuments)
.flatMap(new Func1<BinaryDocument, Observable<BinaryDocument>>() {
public Observable<BinaryDocument> call(final BinaryDocument docToInsert) {
return theBucket.async().upsert(docToInsert);
}
})
.last()
.toBlocking()
.single();
binaryDocuments.clear();
documentCounter.set(0);
}
}
应该可以。
我在插入过程中遇到了一个奇怪的问题。我有两种类型的文档 - JSON 和 BinaryDocument。我正在执行限制为批量大小的批量插入操作。
该操作适用于 JSON 个文档。但是如果我上传,比如 100 个文档,那么对于 BinaryDocument,只有 50 个被上传。每次只有一半的文档被加载到数据库中。
这是我的 JSON 文档插入代码:
public void createMultipleCustomerDocuments(String docId, Customer myCust, long numDocs, int batchSize) {
Gson gson = new GsonBuilder().create();
JsonObject content = JsonObject.fromJson(gson.toJson(myCust));
JsonDocument document = JsonDocument.create(docId, content);
jsonDocuments.add(document);
documentCounter.incrementAndGet();
System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
if(documentCounter.get() >= batchSize){
System.out.println("Document counter: " + documentCounter.get());
Observable
.from(jsonDocuments)
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return theBucket.async().upsert(docToInsert);
}
})
.last()
.toList()
.toBlocking()
.single();
jsonDocuments.clear();
documentCounter.set(0);
}
}
这完全没问题。我插入没有问题。
这是我的 BinaryDocument 插入代码:
public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, long numDocs, int batchSize) throws BackpressureException, InterruptedException {
ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
binaryDocuments.add(buffer);
documentCounter.incrementAndGet();
System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
if(documentCounter.get() >= batchSize){
System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
Observable
.from(binaryDocuments)
.flatMap(new Func1<ByteBuf, Observable<BinaryDocument>>() {
public Observable<BinaryDocument> call(final ByteBuf docToInsert) {
//docToInsert.retain();
return theBucket.async().upsert(BinaryDocument.create(docId, docToInsert));
}
})
.last()
.toList()
.toBlocking()
.single();
binaryDocuments.clear();
documentCounter.set(0);
}
}
这失败了。插入的文档数量正好是一半。甚至数字的打印方式也与 JSON 文档函数的数字完全相同。 documentCounter 显示正确的数字。但是插入数据库的文档数量只有显示的一半。
有人可以帮我吗?
您似乎使用相同的文档 ID(即批次中最后一个成员的 docId)在同一批次中创建所有文档
.BinaryDocument.create(docId, docToInsert)
您应该在 if 语句之外构建 BinaryDocument 数组(就像您对 JsonDocument 版本所做的那样)。像
public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, int batchSize) throws BackpressureException, InterruptedException {
// numDocs is redundant
ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
binaryDocuments.add(BinaryDocument.create(docId, buffer)); // ArrayList<BinaryDocument> type
documentCounter.incrementAndGet();
System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
if(documentCounter.get() >= batchSize){
System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
Observable
.from(binaryDocuments)
.flatMap(new Func1<BinaryDocument, Observable<BinaryDocument>>() {
public Observable<BinaryDocument> call(final BinaryDocument docToInsert) {
return theBucket.async().upsert(docToInsert);
}
})
.last()
.toBlocking()
.single();
binaryDocuments.clear();
documentCounter.set(0);
}
}
应该可以。