Python MongoDB (PyMongo) 多重处理游标
Python MongoDB (PyMongo) Mutliprocessing cursor
我正在尝试制作一个多处理 MongoDB 实用程序,它运行良好,但我认为我遇到了性能问题...即使有 20 个工人,它也不会处理超过 2800 个文档第二...我想我可以快 5 倍...这是我的代码,它没有做任何异常,只是打印到光标末尾的剩余时间。
也许有更好的方法在 MongoDB 游标上执行多处理,因为我需要 运行 每个文档上有 17.4M 记录集合的一些东西,所以性能和时间更少必须的。
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
sys.stdout.flush()
def dowork(queue):
for p, i, pcount in iter(queue.get, 'STOP'):
remaining_time(pcount, i)
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = products.count()
i = 1
print "Procesando %s productos..." % pcount
for p in products:
try:
queue.put((p, i, pcount))
i += 1
except Exception, e:
utils.log(e)
continue
queue.put('STOP')
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
for p in procs:
p.start()
populate_jobs(queue)
for p in procs:
p.join()
此外,我注意到大约每 2500 个 aprox 文档,脚本就会暂停大约 0.5 - 1 秒,这显然是一个坏问题。这是一个 MongoDB 问题,因为如果我执行完全相同的循环但使用 range(0, 1000000)
脚本根本不会暂停并且 运行s 每秒迭代 57,000 次,总共 20秒结束脚本...与每秒 2,800 MongoDB 个文档的巨大差异...
这是 运行 1,000,000 次迭代循环的代码,而不是文档。
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = 1000000
i = 1
print "Procesando %s productos..." % pcount
for p in range(0, 1000000):
queue.put((p, i, pcount))
i += 1
queue.put('STOP')
更新
如我所见,问题不在于多处理本身,是光标填充 Queue
而不是 运行 在多处理模式下,它是一个简单的过程,填充 Queue
(populateJobs
方法)也许如果我可以使游标 multithread/multirpocess 并并行填充 Queue
它会填充得更快,那么多处理方法 dowork
会做得更快,因为我我认为存在一个瓶颈,我在 Queue
中每秒只能填充大约 2,800 个项目,而在 dowork
多进程中检索更多项目,但我不知道如何并行化 MongoDB
游标。
也许,问题是我的计算机和服务器 MongoDB 之间的延迟。在我请求下一个光标和 MongoDB 告诉我哪个是之间的延迟使我的性能降低了 2000%(从 61,000 str/s 到 2,800 doc/s)
不 我在本地主机上试过 MongoDB 性能完全一样...这让我抓狂
为什么要使用多处理?您似乎没有使用队列在其他线程中进行实际工作。 Python 有一个 global interpreter lock 这使得多线程代码的性能低于您的预期。它可能使这个程序变慢,而不是变快。
几个性能提示:
尝试在您的 find() 调用中将 batch_size
设置为某个大数字(例如 20000)。这是一次返回的最大文档数,在客户端获取更多之前,默认是101.
尝试将 cursor_type
设置为 pymongo.cursor.CursorType.EXHAUST
,这可能会减少您遇到的延迟。
以下是如何使用 Pool
喂养 children:
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
sys.stdout.flush()
def dowork(args):
p, i, pcount = args
remaining_time(pcount, i)
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
pcount = products.count()
pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))
pool.close()
pool.join()
请注意,使用 pool.map
需要立即将游标中的所有内容加载到内存中,但这可能是个问题,因为它很大。您可以使用 imap
来避免一次消耗整个东西,但您需要指定 chunksize
以最小化 IPC 开销:
# Calculate chunksize using same algorithm used internally by pool.map
chunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)
if extra:
chunksize += 1
pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)
pool.close()
pool.join()
对于 1,000,000 个项目,块大小为 12,500。您可以尝试比这更大或更小的尺寸,看看它对性能有何影响。
不过,如果瓶颈实际上只是从 MongoDB 中提取数据,我不确定这会有多大帮助。
我正在尝试制作一个多处理 MongoDB 实用程序,它运行良好,但我认为我遇到了性能问题...即使有 20 个工人,它也不会处理超过 2800 个文档第二...我想我可以快 5 倍...这是我的代码,它没有做任何异常,只是打印到光标末尾的剩余时间。
也许有更好的方法在 MongoDB 游标上执行多处理,因为我需要 运行 每个文档上有 17.4M 记录集合的一些东西,所以性能和时间更少必须的。
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
sys.stdout.flush()
def dowork(queue):
for p, i, pcount in iter(queue.get, 'STOP'):
remaining_time(pcount, i)
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = products.count()
i = 1
print "Procesando %s productos..." % pcount
for p in products:
try:
queue.put((p, i, pcount))
i += 1
except Exception, e:
utils.log(e)
continue
queue.put('STOP')
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
for p in procs:
p.start()
populate_jobs(queue)
for p in procs:
p.join()
此外,我注意到大约每 2500 个 aprox 文档,脚本就会暂停大约 0.5 - 1 秒,这显然是一个坏问题。这是一个 MongoDB 问题,因为如果我执行完全相同的循环但使用 range(0, 1000000)
脚本根本不会暂停并且 运行s 每秒迭代 57,000 次,总共 20秒结束脚本...与每秒 2,800 MongoDB 个文档的巨大差异...
这是 运行 1,000,000 次迭代循环的代码,而不是文档。
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = 1000000
i = 1
print "Procesando %s productos..." % pcount
for p in range(0, 1000000):
queue.put((p, i, pcount))
i += 1
queue.put('STOP')
更新
如我所见,问题不在于多处理本身,是光标填充 Queue
而不是 运行 在多处理模式下,它是一个简单的过程,填充 Queue
(populateJobs
方法)也许如果我可以使游标 multithread/multirpocess 并并行填充 Queue
它会填充得更快,那么多处理方法 dowork
会做得更快,因为我我认为存在一个瓶颈,我在 Queue
中每秒只能填充大约 2,800 个项目,而在 dowork
多进程中检索更多项目,但我不知道如何并行化 MongoDB
游标。
也许,问题是我的计算机和服务器 MongoDB 之间的延迟。在我请求下一个光标和 MongoDB 告诉我哪个是之间的延迟使我的性能降低了 2000%(从 61,000 str/s 到 2,800 doc/s) 不 我在本地主机上试过 MongoDB 性能完全一样...这让我抓狂
为什么要使用多处理?您似乎没有使用队列在其他线程中进行实际工作。 Python 有一个 global interpreter lock 这使得多线程代码的性能低于您的预期。它可能使这个程序变慢,而不是变快。
几个性能提示:
尝试在您的 find() 调用中将
batch_size
设置为某个大数字(例如 20000)。这是一次返回的最大文档数,在客户端获取更多之前,默认是101.尝试将
cursor_type
设置为pymongo.cursor.CursorType.EXHAUST
,这可能会减少您遇到的延迟。
以下是如何使用 Pool
喂养 children:
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
sys.stdout.flush()
def dowork(args):
p, i, pcount = args
remaining_time(pcount, i)
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
pcount = products.count()
pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))
pool.close()
pool.join()
请注意,使用 pool.map
需要立即将游标中的所有内容加载到内存中,但这可能是个问题,因为它很大。您可以使用 imap
来避免一次消耗整个东西,但您需要指定 chunksize
以最小化 IPC 开销:
# Calculate chunksize using same algorithm used internally by pool.map
chunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)
if extra:
chunksize += 1
pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)
pool.close()
pool.join()
对于 1,000,000 个项目,块大小为 12,500。您可以尝试比这更大或更小的尺寸,看看它对性能有何影响。
不过,如果瓶颈实际上只是从 MongoDB 中提取数据,我不确定这会有多大帮助。