multiprocessing.Pool.imap_unordered 具有固定队列大小或缓冲区?
multiprocessing.Pool.imap_unordered with fixed queue size or buffer?
我正在从大型 CSV 文件中读取数据,对其进行处理,然后将其加载到 SQLite 数据库中。分析表明我 80% 的时间花在 I/O 上,20% 的时间花在处理输入以为数据库插入做准备。我用 multiprocessing.Pool
加快了处理步骤,因此 I/O 代码永远不会等待下一条记录。但是,这导致了严重的内存问题,因为 I/O 步骤跟不上工人。
以下玩具示例说明了我的问题:
#!/usr/bin/env python # 3.4.3
import time
from multiprocessing import Pool
def records(num=100):
"""Simulate generator getting data from large CSV files."""
for i in range(num):
print('Reading record {0}'.format(i))
time.sleep(0.05) # getting raw data is fast
yield i
def process(rec):
"""Simulate processing of raw text into dicts."""
print('Processing {0}'.format(rec))
time.sleep(0.1) # processing takes a little time
return rec
def writer(records):
"""Simulate saving data to SQLite database."""
for r in records:
time.sleep(0.3) # writing takes the longest
print('Wrote {0}'.format(r))
if __name__ == "__main__":
data = records(100)
with Pool(2) as pool:
writer(pool.imap_unordered(process, data, chunksize=5))
此代码导致记录积压,最终耗尽所有内存,因为我无法足够快地将数据持久保存到磁盘。 运行 代码,您会注意到当 writer
位于第 15 条记录左右时,Pool.imap_unordered
将消耗所有数据。现在想象一下处理步骤是从数亿行中生成字典,你就会明白为什么我 运行 内存不足了。 Amdahl's Law 也许在行动。
解决这个问题的方法是什么?我想我需要某种缓冲区 Pool.imap_unordered
说 "once there are x records that need insertion, stop and wait until there are less than x before making more." 我应该能够在保存最后一条记录的同时准备下一条记录来提高速度。
我尝试使用 papy
模块(我将其修改为与 Python 3 一起使用)中的 NuMap
来执行此操作,但速度并不快。事实上,这比按顺序 运行ning 程序更糟糕; NuMap
使用两个线程加多个进程。
SQLite 的批量导入功能可能不适合我的任务,因为数据需要大量处理和规范化。
我有大约 85G 的压缩文本要处理。我对其他数据库技术持开放态度,但选择 SQLite 是为了易于使用,因为这是一次写入多次读取的工作,在加载所有内容后,只有 3 或 4 个人会使用生成的数据库。
听起来您真正需要的只是用有界(和阻塞)队列替换 Pool
下面的无界队列。这样,如果任何一方领先于其他一方,它就会一直阻塞,直到他们准备好。
通过查看 the source,子类化或 monkeypatch Pool
,这很容易做到,例如:
class Pool(multiprocessing.pool.Pool):
def _setup_queues(self):
self._inqueue = self._ctx.Queue(5)
self._outqueue = self._ctx.Queue(5)
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
self._taskqueue = queue.Queue(10)
但这显然不可移植(即使是 CPython 3.3,更不用说不同的 Python 3 实现)。
我 认为 你可以通过提供自定义 context
在 3.4+ 中移植它,但我没能做到这一点,所以......
由于处理速度很快,但是写入速度很慢,听起来你的问题是
I/O-bound。因此,使用可能不会有太多收获
多处理。
但是,可以剥离 data
的块,处理块,然后
等到数据写入后再剥离另一个块:
import itertools as IT
if __name__ == "__main__":
data = records(100)
with Pool(2) as pool:
chunksize = ...
for chunk in iter(lambda: list(IT.islice(data, chunksize)), []):
writer(pool.imap_unordered(process, chunk, chunksize=5))
当我在处理同样的问题时,我认为防止池过载的有效方法是使用带有生成器的信号量:
from multiprocessing import Pool, Semaphore
def produce(semaphore, from_file):
with open(from_file) as reader:
for line in reader:
# Reduce Semaphore by 1 or wait if 0
semaphore.acquire()
# Now deliver an item to the caller (pool)
yield line
def process(item):
result = (first_function(item),
second_function(item),
third_function(item))
return result
def consume(semaphore, result):
database_con.cur.execute("INSERT INTO ResultTable VALUES (?,?,?)", result)
# Result is consumed, semaphore may now be increased by 1
semaphore.release()
def main()
global database_con
semaphore_1 = Semaphore(1024)
with Pool(2) as pool:
for result in pool.imap_unordered(process, produce(semaphore_1, "workfile.txt"), chunksize=128):
consume(semaphore_1, result)
另请参阅:
一个简单的解决方法可能是使用 psutil 检测每个进程中的内存使用情况,并判断是否占用了超过 90% 的内存,而不是只休眠一会儿。
while psutil.virtual_memory().percent > 75:
time.sleep(1)
print ("process paused for 1 seconds!")
我正在从大型 CSV 文件中读取数据,对其进行处理,然后将其加载到 SQLite 数据库中。分析表明我 80% 的时间花在 I/O 上,20% 的时间花在处理输入以为数据库插入做准备。我用 multiprocessing.Pool
加快了处理步骤,因此 I/O 代码永远不会等待下一条记录。但是,这导致了严重的内存问题,因为 I/O 步骤跟不上工人。
以下玩具示例说明了我的问题:
#!/usr/bin/env python # 3.4.3
import time
from multiprocessing import Pool
def records(num=100):
"""Simulate generator getting data from large CSV files."""
for i in range(num):
print('Reading record {0}'.format(i))
time.sleep(0.05) # getting raw data is fast
yield i
def process(rec):
"""Simulate processing of raw text into dicts."""
print('Processing {0}'.format(rec))
time.sleep(0.1) # processing takes a little time
return rec
def writer(records):
"""Simulate saving data to SQLite database."""
for r in records:
time.sleep(0.3) # writing takes the longest
print('Wrote {0}'.format(r))
if __name__ == "__main__":
data = records(100)
with Pool(2) as pool:
writer(pool.imap_unordered(process, data, chunksize=5))
此代码导致记录积压,最终耗尽所有内存,因为我无法足够快地将数据持久保存到磁盘。 运行 代码,您会注意到当 writer
位于第 15 条记录左右时,Pool.imap_unordered
将消耗所有数据。现在想象一下处理步骤是从数亿行中生成字典,你就会明白为什么我 运行 内存不足了。 Amdahl's Law 也许在行动。
解决这个问题的方法是什么?我想我需要某种缓冲区 Pool.imap_unordered
说 "once there are x records that need insertion, stop and wait until there are less than x before making more." 我应该能够在保存最后一条记录的同时准备下一条记录来提高速度。
我尝试使用 papy
模块(我将其修改为与 Python 3 一起使用)中的 NuMap
来执行此操作,但速度并不快。事实上,这比按顺序 运行ning 程序更糟糕; NuMap
使用两个线程加多个进程。
SQLite 的批量导入功能可能不适合我的任务,因为数据需要大量处理和规范化。
我有大约 85G 的压缩文本要处理。我对其他数据库技术持开放态度,但选择 SQLite 是为了易于使用,因为这是一次写入多次读取的工作,在加载所有内容后,只有 3 或 4 个人会使用生成的数据库。
听起来您真正需要的只是用有界(和阻塞)队列替换 Pool
下面的无界队列。这样,如果任何一方领先于其他一方,它就会一直阻塞,直到他们准备好。
通过查看 the source,子类化或 monkeypatch Pool
,这很容易做到,例如:
class Pool(multiprocessing.pool.Pool):
def _setup_queues(self):
self._inqueue = self._ctx.Queue(5)
self._outqueue = self._ctx.Queue(5)
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
self._taskqueue = queue.Queue(10)
但这显然不可移植(即使是 CPython 3.3,更不用说不同的 Python 3 实现)。
我 认为 你可以通过提供自定义 context
在 3.4+ 中移植它,但我没能做到这一点,所以......
由于处理速度很快,但是写入速度很慢,听起来你的问题是 I/O-bound。因此,使用可能不会有太多收获 多处理。
但是,可以剥离 data
的块,处理块,然后
等到数据写入后再剥离另一个块:
import itertools as IT
if __name__ == "__main__":
data = records(100)
with Pool(2) as pool:
chunksize = ...
for chunk in iter(lambda: list(IT.islice(data, chunksize)), []):
writer(pool.imap_unordered(process, chunk, chunksize=5))
当我在处理同样的问题时,我认为防止池过载的有效方法是使用带有生成器的信号量:
from multiprocessing import Pool, Semaphore
def produce(semaphore, from_file):
with open(from_file) as reader:
for line in reader:
# Reduce Semaphore by 1 or wait if 0
semaphore.acquire()
# Now deliver an item to the caller (pool)
yield line
def process(item):
result = (first_function(item),
second_function(item),
third_function(item))
return result
def consume(semaphore, result):
database_con.cur.execute("INSERT INTO ResultTable VALUES (?,?,?)", result)
# Result is consumed, semaphore may now be increased by 1
semaphore.release()
def main()
global database_con
semaphore_1 = Semaphore(1024)
with Pool(2) as pool:
for result in pool.imap_unordered(process, produce(semaphore_1, "workfile.txt"), chunksize=128):
consume(semaphore_1, result)
另请参阅:
一个简单的解决方法可能是使用 psutil 检测每个进程中的内存使用情况,并判断是否占用了超过 90% 的内存,而不是只休眠一会儿。
while psutil.virtual_memory().percent > 75:
time.sleep(1)
print ("process paused for 1 seconds!")