在具有重复键的批量插入中查找插入文档的数量

Finding number of inserted documents in a bulk insert with duplicate keys

我正在对 mongodb 数据库进行批量插入。我知道 99% 的记录插入都会因为重复键错误而失败。我想在插入后打印有多少条新记录被插入到数据库中。所有这些都是通过龙卷风电机 mongodb 驱动程序在 python 中完成的,但这可能并不重要。

try:
    bulk_write_result = yield db.collections.probe.insert(dataarray, continue_on_error=True)
    nr_inserts = bulk_write_result["nInserted"]
except pymongo.errors.DuplicateKeyError as e:
    nr_inserts = ????  <--- what should I put here?

由于抛出异常,bulk_write_result为空。显然我可以(并发问题除外)在插入之前和之后对整个集合进行计数,但我不喜欢为了日志文件中的一行而额外往返数据库。那么有什么方法可以发现实际插入了多少条记录?

我不清楚你为什么 yield 你的插入结果。但是,关于 bulk inserts:

  • 你应该使用 insert_many as insert 已弃用;
  • ordered 关键字设置为 False 时,如果出现错误,您的插入将继续;
  • 在出错的情况下,insert_many 会抛出一个BulkWriteError,你可以通过查询获得插入文档的数量。

所有这些都导致了类似的事情:

try:
  insert_many_result = db.collections.probe.insert_many(dataaray,ordered=False)
  nr_inserts = len(insert_many_result.inserted_ids)
except pymongo.errors.BulkWriteError as bwe:
  nr_inserts = bwe.details["nInserted"]

如果您需要确定写入错误背后的 原因,您将必须检查 bwe.details['writeErrors'] 数组。代码值 11000 表示 "Duplicate key error":

>>> pprint(e.details['writeErrors'])
[{'code': 11000,
  'errmsg': 'E11000 duplicate key error index: test.w.$k_1 dup key: { : 1 }',
  'index': 0,
  'op': {'_id': ObjectId('555465cacf96c51208587eac'), 'k': 1}},
 {'code': 11000,
  'errmsg': 'E11000 duplicate key error index: test.w.$k_1 dup key: { : 3 }',
  'index': 1,
  'op': {'_id': ObjectId('555465cacf96c51208587ead'), 'k': 3}}

在这里,如您所见,我试图在 test 数据库的 w 集合中插入两个文档。由于重复键错误,两次插入均失败。

常规插入 continue_on_error 无法报告您想要的信息。但是,如果您使用的是 MongoDB 2.6 或更高版本,我们有一个具有良好错误报告功能的高性能解决方案。这是一个使用 Motor 的 BulkOperationBuilder 的完整示例:

import pymongo.errors
from tornado import gen
from tornado.ioloop import IOLoop
from motor import MotorClient

db = MotorClient()
dataarray = [{'_id': 0},
             {'_id': 0},  # Duplicate.
             {'_id': 1}]


@gen.coroutine
def my_insert():
    try:
        bulk = db.collections.probe.initialize_unordered_bulk_op()

        # Prepare the operation on the client.
        for doc in dataarray:
            bulk.insert(doc)

        # Send to the server all at once.
        bulk_write_result = yield bulk.execute()
        nr_inserts = bulk_write_result["nInserted"]
    except pymongo.errors.BulkWriteError as e:
        print(e)
        nr_inserts = e.details['nInserted']

    print('nr_inserts: %d' % nr_inserts)


IOLoop.instance().run_sync(my_insert)

完整文档:http://motor.readthedocs.org/en/stable/examples/bulk.html

注意 2.6 之前 MongoDB 上批量插入性能不佳的警告!它仍然可以工作,但需要每个文档单独往返一次。在 2.6+ 中,驱动程序在一次往返中将整个操作发送到服务器,服务器报告成功的次数和失败的次数。