MongoDB 写入问题:不同的数据库显示相同文档的不同计数
MongoDB Write Issue: Different DBs showing different count for same documents
我正在尝试在 MongoDB 的 2 个数据库中的不同集合中批量插入某些文档。
MongoClient mongoClient2 = this.getMongoClient();
MongoDatabase currentDB = mongoClient2.getDatabase(splits[0]);
MongoCollection<Document> currentCollectionNew = currentDB.getCollection(splits[1])
.withWriteConcern(WriteConcern.MAJORITY.withJournal(true))
.withReadConcern(ReadConcern.MAJORITY);
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
bulkWriteOptions.ordered(true);
bulkWriteOptions.bypassDocumentValidation(true);
try {
BulkWriteResult bulkWriteResult = urrentCollectionNew.bulkWrite(listDoc,
bulkWriteOptions);
logger.info("bulkWriteResult inserted count in MAIN: " + bulkWriteResult.getInsertedCount());
logger.info("bulkWriteResult modified count in MAIN: " + bulkWriteResult.getModifiedCount());
logger.info("bulkWriteResult matched count in MAIN : " + bulkWriteResult.getMatchedCount());
logger.info("bulkWriteResult deleted count in MAIN : " + bulkWriteResult.getDeletedCount());
logger.info("bulkWriteResult upserted count in MAIN : " + bulkWriteResult.getUpserts().size());
logger.info("bulkWriteResult was acknowledged in MAIN : " + bulkWriteResult.wasAcknowledged());
mongoClient2.close()
} catch (Exception e) {
logger.warn("Error in bulkWriting main DB: {} ", e.getMessage());
logger.error(e.getMessage(), e);
}
MongoCollection<Document> mongoStageCollection = objFactory.getCollectionObject(resourceType, true);
String[] splitsStage = mongoStageCollection.getNamespace().getFullName().split("\.");
MongoClient mongoClient3 = this.getMongoClient();
MongoDatabase newStageDB = mongoClient3.getDatabase(splitsStage[0]);
MongoCollection<Document> stageCollectionNew = newStageDB.getCollection(splitsStage[1])
.withWriteConcern(WriteConcern.MAJORITY.withJournal(true))
.withReadConcern(ReadConcern.MAJORITY);
logger.info("mongoStageCollection.getWriteConcern(): {} ", mongoStageCollection.getWriteConcern());
logger.info("mongoStageCollection.getReadConcern(): {} ",
mongoStageCollection.getReadConcern().toString());
logger.info("mongoStageCollection.getReadPreference(): {}",
mongoStageCollection.getReadPreference().getName());
try {
BulkWriteResult bulkWriteResult = stageCollectionNew.bulkWrite(listDoc, bulkWriteOptions);
logger.info("bulkWriteResult inserted count in STAGING: " + bulkWriteResult.getInsertedCount());
logger.info("bulkWriteResult modified count in STAGING: " + bulkWriteResult.getModifiedCount());
logger.info("bulkWriteResult matched count in STAGING: " + bulkWriteResult.getMatchedCount());
logger.info("bulkWriteResult deleted count in STAGING: " + bulkWriteResult.getDeletedCount());
logger.info("bulkWriteResult upserted count in STAGING: " + bulkWriteResult.getUpserts().size());
logger.info("bulkWriteResult was acknowledged in STAGING: " + bulkWriteResult.wasAcknowledged());
mongoClient3.close();
} catch (Exception e) {
logger.warn("Error in bulkWriting STAGING DB: {} ", e.getMessage());
logger.error(e.getMessage(), e);
}
例如
2 个数据库是 FHIR 和 FHIR_Stage。
在两个数据库中创建相同的集合。
FHIR.Condition 和 FHIR_STAGE.Condition
FHIR.Observation 和 FHIR_STAGE.Observation
等等...
FHIR 应该拥有所有数据,而 FHIR_Stage 应该只有增量数据。
但是,在初始加载中,两个数据库应该包含完全相同的数据。
我看到的是这两个数据库中集合中的计数不匹配
即 FHIR.Condition 中的计数与 FHIR_STAGE.Condition
中的计数不同
这里的问题是这种不匹配是随机发生的,即它有时匹配有时不匹配(当我清理所有内容并重新 运行 初始加载时)
这两个数据库中的不同集合都会发生这种情况。
而且它没有模式,随机一些集合计数不会匹配,有时一切都会匹配。
我已经有一个星期无法解决这个问题了。
非常感谢任何帮助。
Mongo数据库设置:
我们有一个 3 节点 (VM) 集群。我们有 3 个分片 运行ning,每个分片都是一个 3 成员副本集。每个节点都是其中一个副本集的主节点。
集群使用 x509 证书进行保护。
我在 sh.status() 或 rs.status() 中都没有看到任何错误。
也没有复制滞后。
数据库和集合是根据某些业务逻辑从 Java 代码动态创建的。我还在数据库上启用分片,然后在代码中的集合上启用分片。
WriteConcern - 多数
ReadPreference - 小学
ReadConcern - 多数
Mongo-版本:3.4.15
Mongo Java 驱动程序:3.4.2
仅供参考——相同的代码库在独立 MongoDB 上按预期工作。
感谢期待。
如果需要,我很乐意分享更多信息。
P.S.
如果有什么区别的话,写入MongoDB的进程是一个Kafka Consumer
我们在发布此消息几天后发现了这个问题 -
我们的是分片 MongoDB 集群。
这是因为我们没有使用聚合管道中的 count() 函数,而是依赖 db.collectionName.count()
转到聚合管道,我们可以在两个数据库中看到相同的文档。
Reference指出来
我正在尝试在 MongoDB 的 2 个数据库中的不同集合中批量插入某些文档。
MongoClient mongoClient2 = this.getMongoClient();
MongoDatabase currentDB = mongoClient2.getDatabase(splits[0]);
MongoCollection<Document> currentCollectionNew = currentDB.getCollection(splits[1])
.withWriteConcern(WriteConcern.MAJORITY.withJournal(true))
.withReadConcern(ReadConcern.MAJORITY);
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
bulkWriteOptions.ordered(true);
bulkWriteOptions.bypassDocumentValidation(true);
try {
BulkWriteResult bulkWriteResult = urrentCollectionNew.bulkWrite(listDoc,
bulkWriteOptions);
logger.info("bulkWriteResult inserted count in MAIN: " + bulkWriteResult.getInsertedCount());
logger.info("bulkWriteResult modified count in MAIN: " + bulkWriteResult.getModifiedCount());
logger.info("bulkWriteResult matched count in MAIN : " + bulkWriteResult.getMatchedCount());
logger.info("bulkWriteResult deleted count in MAIN : " + bulkWriteResult.getDeletedCount());
logger.info("bulkWriteResult upserted count in MAIN : " + bulkWriteResult.getUpserts().size());
logger.info("bulkWriteResult was acknowledged in MAIN : " + bulkWriteResult.wasAcknowledged());
mongoClient2.close()
} catch (Exception e) {
logger.warn("Error in bulkWriting main DB: {} ", e.getMessage());
logger.error(e.getMessage(), e);
}
MongoCollection<Document> mongoStageCollection = objFactory.getCollectionObject(resourceType, true);
String[] splitsStage = mongoStageCollection.getNamespace().getFullName().split("\.");
MongoClient mongoClient3 = this.getMongoClient();
MongoDatabase newStageDB = mongoClient3.getDatabase(splitsStage[0]);
MongoCollection<Document> stageCollectionNew = newStageDB.getCollection(splitsStage[1])
.withWriteConcern(WriteConcern.MAJORITY.withJournal(true))
.withReadConcern(ReadConcern.MAJORITY);
logger.info("mongoStageCollection.getWriteConcern(): {} ", mongoStageCollection.getWriteConcern());
logger.info("mongoStageCollection.getReadConcern(): {} ",
mongoStageCollection.getReadConcern().toString());
logger.info("mongoStageCollection.getReadPreference(): {}",
mongoStageCollection.getReadPreference().getName());
try {
BulkWriteResult bulkWriteResult = stageCollectionNew.bulkWrite(listDoc, bulkWriteOptions);
logger.info("bulkWriteResult inserted count in STAGING: " + bulkWriteResult.getInsertedCount());
logger.info("bulkWriteResult modified count in STAGING: " + bulkWriteResult.getModifiedCount());
logger.info("bulkWriteResult matched count in STAGING: " + bulkWriteResult.getMatchedCount());
logger.info("bulkWriteResult deleted count in STAGING: " + bulkWriteResult.getDeletedCount());
logger.info("bulkWriteResult upserted count in STAGING: " + bulkWriteResult.getUpserts().size());
logger.info("bulkWriteResult was acknowledged in STAGING: " + bulkWriteResult.wasAcknowledged());
mongoClient3.close();
} catch (Exception e) {
logger.warn("Error in bulkWriting STAGING DB: {} ", e.getMessage());
logger.error(e.getMessage(), e);
}
例如 2 个数据库是 FHIR 和 FHIR_Stage。 在两个数据库中创建相同的集合。 FHIR.Condition 和 FHIR_STAGE.Condition
FHIR.Observation 和 FHIR_STAGE.Observation
等等...
FHIR 应该拥有所有数据,而 FHIR_Stage 应该只有增量数据。 但是,在初始加载中,两个数据库应该包含完全相同的数据。
我看到的是这两个数据库中集合中的计数不匹配 即 FHIR.Condition 中的计数与 FHIR_STAGE.Condition
中的计数不同这里的问题是这种不匹配是随机发生的,即它有时匹配有时不匹配(当我清理所有内容并重新 运行 初始加载时) 这两个数据库中的不同集合都会发生这种情况。 而且它没有模式,随机一些集合计数不会匹配,有时一切都会匹配。
我已经有一个星期无法解决这个问题了。 非常感谢任何帮助。
Mongo数据库设置:
我们有一个 3 节点 (VM) 集群。我们有 3 个分片 运行ning,每个分片都是一个 3 成员副本集。每个节点都是其中一个副本集的主节点。
集群使用 x509 证书进行保护。
我在 sh.status() 或 rs.status() 中都没有看到任何错误。 也没有复制滞后。
数据库和集合是根据某些业务逻辑从 Java 代码动态创建的。我还在数据库上启用分片,然后在代码中的集合上启用分片。
WriteConcern - 多数
ReadPreference - 小学
ReadConcern - 多数
Mongo-版本:3.4.15 Mongo Java 驱动程序:3.4.2
仅供参考——相同的代码库在独立 MongoDB 上按预期工作。
感谢期待。
如果需要,我很乐意分享更多信息。
P.S.
如果有什么区别的话,写入MongoDB的进程是一个Kafka Consumer
我们在发布此消息几天后发现了这个问题 -
我们的是分片 MongoDB 集群。
这是因为我们没有使用聚合管道中的 count() 函数,而是依赖 db.collectionName.count()
转到聚合管道,我们可以在两个数据库中看到相同的文档。
Reference指出来