正确使用带生成器的线程池
Correctly use ThreadPool with Generators
在 Python 2.7 中处理 CSV 文件时,我在使用带有 Generator
的 ThreadPool
时遇到问题。下面是一些示例代码来说明我的观点:
from multiprocessing.dummy import Pool as ThreadPool
import time
def getNextBatch():
# Reads lines from a huge CSV and yields them as required.
for i in range(5):
yield i;
def processBatch(batch):
# This simulates a slow network request that happens.
time.sleep(1);
print "Processed Batch " + str(batch);
# We use 4 threads to attempt to aleviate the bottleneck caused by network I/O.
threadPool = ThreadPool(processes = 4)
batchGenerator = getNextBatch()
for batch in batchGenerator:
threadPool.map(processBatch, (batch,))
threadPool.close()
threadPool.join()
当我 运行 这样做时,我得到了预期的输出:
Processed Batch 0
Processed Batch 1
Processed Batch 2
Processed Batch 3
Processed Batch 4
问题是它们在每次打印之间出现 1 秒延迟。实际上,我的脚本是按顺序 运行ning(并没有像我希望的那样使用多线程)。
这里的目标是让这些打印的语句在 ~1 秒后全部出现,而不是每秒一个,持续 5 秒。
这是你的问题
for batch in batchGenerator:
threadPool.map(processBatch, (batch,))
当我尝试时
threadPool.map(processBatch, batchGenerator)
它按预期工作(但不按顺序)。 for 循环使用线程池一次处理一个批次。所以它完成了一个,然后继续前进,然后......
在 Python 2.7 中处理 CSV 文件时,我在使用带有 Generator
的 ThreadPool
时遇到问题。下面是一些示例代码来说明我的观点:
from multiprocessing.dummy import Pool as ThreadPool
import time
def getNextBatch():
# Reads lines from a huge CSV and yields them as required.
for i in range(5):
yield i;
def processBatch(batch):
# This simulates a slow network request that happens.
time.sleep(1);
print "Processed Batch " + str(batch);
# We use 4 threads to attempt to aleviate the bottleneck caused by network I/O.
threadPool = ThreadPool(processes = 4)
batchGenerator = getNextBatch()
for batch in batchGenerator:
threadPool.map(processBatch, (batch,))
threadPool.close()
threadPool.join()
当我 运行 这样做时,我得到了预期的输出:
Processed Batch 0
Processed Batch 1
Processed Batch 2
Processed Batch 3
Processed Batch 4
问题是它们在每次打印之间出现 1 秒延迟。实际上,我的脚本是按顺序 运行ning(并没有像我希望的那样使用多线程)。
这里的目标是让这些打印的语句在 ~1 秒后全部出现,而不是每秒一个,持续 5 秒。
这是你的问题
for batch in batchGenerator:
threadPool.map(processBatch, (batch,))
当我尝试时
threadPool.map(processBatch, batchGenerator)
它按预期工作(但不按顺序)。 for 循环使用线程池一次处理一个批次。所以它完成了一个,然后继续前进,然后......