测试 motor.MotorClient() 连接

Testing motor.MotorClient() connection

我想知道如何检查电机连接是否成功。如果我终止 mongod 进程并执行:

con = motor.MotorClient(host,port)

我得到"[W 150619 00:38:38 iostream:1126] Connect error on fd 11: ECONNREFUSED"

有意义,因为没有服务器 运行。但是,因为这不是例外,所以我不确定如何检查这种情况?

我以为我可以检查 con.is_mongos 但是它似乎总是错误的(也在文档中说明)。

如何检查上述错误情况?

一般来说,答案是不要。不要检查与 MongoDB 的连接是否正常。您可能会发现它现在正在运行,但下一个操作可能无论如何都会失败——您的系统管理员可能会拔掉您的电缆、服务器崩溃,等等。由于无论如何都需要稍后在应用程序中处理错误,因此提前检查电机是否已成功连接是没有意义的。

如果你坚持这种毫无意义的检查,即使知道它对你没有任何好处,那么你可以这样做:

@gen.coroutine
def am_i_momentarily_connected_to_mongodb():
    yield MotorClient().admin.command('ismaster')

如果您的 IOLoop 已经启动并且您在协程中,那么您可以执行以下操作:

yield am_i_momentarily_connected_to_mongodb()

或者如果您还没有开始循环:

IOLoop.instance().run_sync(am_i_momentarily_connected_to_mongodb)

但是就像我说的,发现服务器可用是分布式系统的本质现在不会告诉你任何关于接下来会发生什么的事情.

虽然 A. Jesse Jiryu Davis 说得有道理。当你想检查你的连接时,当然有有效的情况。例如,当您启动您的程序时,用户可能忘记启动 MongoDb。一条漂亮的错误消息胜过冗长的堆栈跟踪。

检查连接的方法如下:

import asyncio
import motor.motor_asyncio
async def get_server_info():
    # replace this with your MongoDB connection string
    conn_str = "<your MongoDB Atlas connection string>"
    # set a 5-second connection timeout
    client = motor.motor_asyncio.AsyncIOMotorClient(conn_str, serverSelectionTimeoutMS=5000)
    try:
        print(await client.server_info())
    except Exception:
        print("Unable to connect to the server.")
loop = asyncio.get_event_loop()
loop.run_until_complete(get_server_info())

来源:https://docs.mongodb.com/drivers/motor/#std-label-connect-atlas-motor-driver