有效地将数千条记录更新到 mongodb
Efficiently upsert thousands of records to mongodb
我们有一个包含约 2000 万条记录的数据库,其中包含一个索引字段 order_id。
每一天,每小时之后,我们都会收到增量更新,从 ~2K 开始增长到 ~50K 记录,其中一些可能是新的,而另一些是对以前记录的更新。
为了处理这些数据,我使用 Airflow 创建了一个管道。在将数据推送到 MongoDB 之前,数据可作为 Pandas 数据框使用,因为我正在使用它来处理和清理数据。
目前,我正在使用此代码更新插入 数据。但我不确定它是否是正确或有效的解决方案。
from pymongo import ReplaceOne
col = MongoDB.get_collection("collection", mongo_db="database")
# My processed data
df
df.reset_index()
bulk_data = [
ReplaceOne({"order_id": row["order_id"]}, dict(row), upsert=True)
for index, row in df.iterrows()
]
col.bulk_write(bulk_data)
那么其他选项可能是什么?这种做任务的方式合乎逻辑还是我做错了?
你有一个最有效的技术;您的查询字段已编入索引并且您正在使用批量操作。如果这是 运行 缓慢,我会感到惊讶,即使是在 50k 条记录上也是如此。
如果您想压榨出最后一滴性能,这种方法可能会更快。它删除所有传入记录和 re-inserts 它们;通常这比使用 ReplaceOne()
和 upsert=True
更高效。此外,与使用 iterrows()
相比,pandas 中的 to_dict()
方法删除了一个步骤。最后,您可以将 bulk ordered 选项设置为 False,这又是一个性能提升,因为您大概不关心插入的顺序。
col.delete_many({"order_id": {'$in': list(df["order_id"])}})
bulk_data = [InsertOne(row) for row in df.to_dict(orient='records')]
col.bulk_write(bulk_data, ordered=False)
我们有一个包含约 2000 万条记录的数据库,其中包含一个索引字段 order_id。
每一天,每小时之后,我们都会收到增量更新,从 ~2K 开始增长到 ~50K 记录,其中一些可能是新的,而另一些是对以前记录的更新。
为了处理这些数据,我使用 Airflow 创建了一个管道。在将数据推送到 MongoDB 之前,数据可作为 Pandas 数据框使用,因为我正在使用它来处理和清理数据。
目前,我正在使用此代码更新插入 数据。但我不确定它是否是正确或有效的解决方案。
from pymongo import ReplaceOne
col = MongoDB.get_collection("collection", mongo_db="database")
# My processed data
df
df.reset_index()
bulk_data = [
ReplaceOne({"order_id": row["order_id"]}, dict(row), upsert=True)
for index, row in df.iterrows()
]
col.bulk_write(bulk_data)
那么其他选项可能是什么?这种做任务的方式合乎逻辑还是我做错了?
你有一个最有效的技术;您的查询字段已编入索引并且您正在使用批量操作。如果这是 运行 缓慢,我会感到惊讶,即使是在 50k 条记录上也是如此。
如果您想压榨出最后一滴性能,这种方法可能会更快。它删除所有传入记录和 re-inserts 它们;通常这比使用 ReplaceOne()
和 upsert=True
更高效。此外,与使用 iterrows()
相比,pandas 中的 to_dict()
方法删除了一个步骤。最后,您可以将 bulk ordered 选项设置为 False,这又是一个性能提升,因为您大概不关心插入的顺序。
col.delete_many({"order_id": {'$in': list(df["order_id"])}})
bulk_data = [InsertOne(row) for row in df.to_dict(orient='records')]
col.bulk_write(bulk_data, ordered=False)