在批量插入期间,只有一半的 BinaryDocument 被插入

Only half of the BinaryDocument(s) are getting inserted during bulk insert

我在插入过程中遇到了一个奇怪的问题。我有两种类型的文档 - JSON 和 BinaryDocument。我正在执行限制为批量大小的批量插入操作。

该操作适用于 JSON 个文档。但是如果我上传,比如 100 个文档,那么对于 BinaryDocument,只有 50 个被上传。每次只有一半的文档被加载到数据库中。

这是我的 JSON 文档插入代码:

public void createMultipleCustomerDocuments(String docId, Customer myCust, long numDocs, int batchSize) {

        Gson gson = new GsonBuilder().create();
        JsonObject content = JsonObject.fromJson(gson.toJson(myCust));
        JsonDocument document = JsonDocument.create(docId, content);
        jsonDocuments.add(document);
        documentCounter.incrementAndGet();
        System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
        if(documentCounter.get() >= batchSize){
            System.out.println("Document counter: " + documentCounter.get());
            Observable
            .from(jsonDocuments)
            .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
                public Observable<JsonDocument> call(final JsonDocument docToInsert) {
                    return theBucket.async().upsert(docToInsert);
                }
            })
            .last()
            .toList()
            .toBlocking()
            .single();
            jsonDocuments.clear();
            documentCounter.set(0);
        }


    }

这完全没问题。我插入没有问题。

这是我的 BinaryDocument 插入代码:

public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, long numDocs, int batchSize) throws BackpressureException, InterruptedException {
        ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
        binaryDocuments.add(buffer);
        documentCounter.incrementAndGet();

        System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());

        if(documentCounter.get() >= batchSize){
            System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
            Observable
            .from(binaryDocuments)
            .flatMap(new Func1<ByteBuf, Observable<BinaryDocument>>() {
                public Observable<BinaryDocument> call(final ByteBuf docToInsert) {
                    //docToInsert.retain();

                    return theBucket.async().upsert(BinaryDocument.create(docId, docToInsert));

                } 
            })
            .last()
            .toList()
            .toBlocking()
            .single();

            binaryDocuments.clear();

            documentCounter.set(0);
        }
     }

这失败了。插入的文档数量正好是一半。甚至数字的打印方式也与 JSON 文档函数的数字完全相同。 documentCounter 显示正确的数字。但是插入数据库的文档数量只有显示的一半。

有人可以帮我吗?

您似乎使用相同的文档 ID(即批次中最后一个成员的 docId)在同一批次中创建所有文档

.BinaryDocument.create(docId, docToInsert)

您应该在 if 语句之外构建 BinaryDocument 数组(就像您对 JsonDocument 版本所做的那样)。像

public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, int batchSize) throws BackpressureException, InterruptedException {
    //  numDocs is redundant
    ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
    binaryDocuments.add(BinaryDocument.create(docId, buffer)); // ArrayList<BinaryDocument> type
    documentCounter.incrementAndGet();

    System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());

    if(documentCounter.get() >= batchSize){
        System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
        Observable
        .from(binaryDocuments)
        .flatMap(new Func1<BinaryDocument, Observable<BinaryDocument>>() {
            public Observable<BinaryDocument> call(final BinaryDocument docToInsert) {
                return theBucket.async().upsert(docToInsert);
            } 
        })
        .last()
        .toBlocking()
        .single();
        binaryDocuments.clear();
        documentCounter.set(0);
    }
}

应该可以。