如何在 Celery worker 中正确连接到 mongodb?

How to correctly connect to mongodb in Celery worker?

学习 Celery,阅读 Celery Best Practices,并且有一个关于 Celery 数据库使用的非常简单的问题。

丹尼·贝尔托维奇说:

You shouldn't pass Database objects (for instance your User model) to a background task because the serialized object might contain stale data.

所以,如果我想在worker中连接数据库,正确的选择是什么:

@app.task
def add(x, y, collection):
    client = MongoClient('mongodb://localhost:27017/')
    db = client.wakawaka
    db[collection].insert_one({'sum':x+y})
    return True

或:

client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka

@app.task
def add(x, y, collection):
    db[collection].insert_one({'sum':x+y})
    return True

?

UPD: 我可以在每个任务结束时 close() 我的 mongodb 连接,所以每次我需要一些东西时,任务都会连接到新的DB,没有资源浪费。不过,我需要open/close数据库连接那么多次吗?或者我可以连接一次并以某种方式刷新连接以检索新版本的数据库?

Opening/closing 每个事务的数据库连接使您免受因事务独立执行而导致的陈旧或不正确数据的错误。数据库事务的生命周期管理也得到了简化。

您可以在连接上下文管理器块中编写您的事务。这会处理关闭连接,因此无需显式关闭连接。它也是线程安全的。您还可以利用内置的连接池在出现异常时重试。

@app.task
def add(x, y, collection):
    with MongoClient('mongodb://localhost:27017') as connection:
        db = connection.db
        db.collection.insert_one({'sum':x+y})
    return True

请注意,如果您要更新多个文档,这不会保持数据更新的事务完整性。

从Mongodb4.0.0和pymongo 3.7开始支持多文档ACID事务