Mongodb 批量写入 Updateone 或 Updatemany

Mongodb Bulk write Updateone or Updatemany

我想知道它是否更快(导入)使用 updateoneupdatemany 批量 write.My 代码将数据导入带有 pymongo 的集合看起来是这样的:

for file in sorted_files:
    df = process_file(file)
    for row, item in df.iterrows():
        data_dict = item.to_dict()
        bulk_request.append(UpdateOne(
            {"nsamples": {"$lt": 12}},
            {
                "$push": {"samples": data_dict},
                "$inc": {"nsamples": 1}
            },
            upsert=True
        ))
    result = mycol1.bulk_write(bulk_request)

当我尝试更新许多时,我唯一改变的是:

...
...
bulk_request.append(UpdateMany(..
..
..

我没有发现插入有任何重大差异 time.Shouldnt updateMany 会更快吗? 也许我正在做某事 wrong.Any 建议会有所帮助! 提前致谢!

注意:我的数据有120万行。我需要每个文档包含12个子文档。

updateOne 仅更新 nsamples: {$lt: 12} 第一个 匹配文档。所以 updateOne 应该更快。

可是,为什么要一一插入呢?将所有内容放在一个文档中并进行一次更新。与此相似:

sample_data = [];
for row, item in df.iterrows():
    data_dict = item.to_dict();
    sample_data.append(data_dict);
db.mycol1.updateOne(
  {"nsamples": {"$lt": 12}},
  { 
     "$push": { samples: { $each: sample_data } },
     "$inc": {"nsamples": len(sample_data) }
  }
)

@Wernfried Domscheit's回答正确。

此答案特定于您的场景。

如果您不介意不更新现有文档的记录并完全插入新文档,请使用以下最适合您的用例的代码。

sorted_files = []
process_file = None
for file in sorted_files:
    df = process_file(file)
    sample_data = []
    for row, item in df.iterrows():
        sample_data.append(item.to_dict())
        if len(sample_data) == 12:
            mycol1.insertOne({
                "samples": sample_data,
                "nsamples": len(sample_data),
            })
            sample_data = []
    mycol1.insertOne({
        "samples": sample_data,
        "nsamples": len(sample_data),
    })

如果您想用 12 个对象填充现有记录,然后, 创建新记录,使用以下代码逻辑。

Note: I have not tested the code in my local, its just to understand the flow for you to use.

for file in sorted_files:
    df = process_file(file)
    sample_data = []
    continuity_flag = False
    for row, item in df.iterrows():
        sample_data.append(item.to_dict())
        if not continuity_flag:
            sample_rec = mycol1.find_one({"nsamples": {"$lt": 12}}, {"nsamples": 1})
            if sample_rec is None:
                continuity_flag = True
            elif sample_rec["nsamples"] + len(sample_data) == 12:
                mycol1.update_one({
                    "_id": sample_rec["_id"]
                }, {
                    "$push": {"samples": {"$each": sample_data}},
                    "$inc": {"nsamples": len(sample_data)}
                })
        if len(sample_data) == 12:
            mycol1.insert_one({
                "samples": sample_data,
                "nsamples": len(sample_data),
            })
            sample_data = []
    if sample_data:
        mycol1.insert_one({
            "samples": sample_data,
            "nsamples": len(sample_data),
        })