Python - 使用队列时多处理线程不会关闭
Python - multiprocessing threads don't close when using Queue
这是给 Python 3.x
我正在以 300 条为单位从 CSV 文件加载记录,然后生成工作线程以将它们提交给 REST API。我将 HTTP 响应保存在一个队列中,这样我就可以在处理完整个 CSV 文件后计算跳过的记录数。但是,在我向工作人员添加队列后,线程似乎不再关闭。我想监控线程数有两个原因:(1) 一旦完成,我可以计算并显示跳过计数,(2) 我想增强我的脚本以生成不超过 20 个左右的线程,所以我不要 运行 内存不足。
我有 2 个问题:
- 有人可以解释为什么在使用
q.put()
时线程保持活动状态吗?
- 是否有不同的方法来管理线程数,并监视是否所有线程都已完成?
这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切细节):
import requests, json, csv, time, datetime, multiprocessing
TEST_FILE = 'file.csv'
def read_test_data(path, chunksize=300):
leads = []
with open(path, 'rU') as data:
reader = csv.DictReader(data)
for index, row in enumerate(reader):
if (index % chunksize == 0 and index > 0):
yield leads
del leads[:]
leads.append(row)
yield leads
def worker(leads, q):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
q.put(r.text) # this puts the response in a queue for later analysis
return
if __name__ == "__main__":
q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
jobs = []
for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
p = multiprocessing.Process(target=worker, args=(leads,q,))
jobs.append(p)
p.start()
time.sleep(20) # checking if processes are closing automatically (they don't)
print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0
# The intent is to wait until all workers are done, but it results in an infinite loop
# when I remove 'q.put' in the worker it works fine
#while len(multiprocessing.active_children()) > 0: #
# time.sleep(1)
skipped_count = 0
while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
http_response = json.loads(q.get())
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
这很可能是因为 multiprocessing.Queue
的这个记录怪癖:
Bear in mind that a process that has put items in a queue will wait
before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the cancel_join_thread()
method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that
all items which have been put on the queue will eventually be removed
before the process is joined. Otherwise you cannot be sure that
processes which have put items on the queue will terminate. Remember
also that non-daemonic processes will be joined automatically.
基本上,您需要确保 get()
来自 Queue
的所有项目,以保证 put
进入 Queue
的所有流程可以退出。
我认为在这种情况下你最好使用 multiprocessing.Pool
,并将你所有的工作提交给 multiprocessing.Pool.map
。这大大简化了事情,并让您完全控制进程数 运行:
def worker(leads):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
return r.text
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count() * 2) # cpu_count() * 2 processes running in the pool
responses = pool.map(worker, read_test_data(TEST_FILE))
skipped_count = 0
for raw_response in responses:
http_response = json.loads(raw_response)
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
如果您担心将 read_test_data(TEST_FILE)
转换为列表(使用 Pool.map
需要)的内存成本,您可以改用 Pool.imap
。
编辑:
正如我在上面的评论中提到的,这个用例看起来像 I/O-bound,这意味着您可以通过使用 multiprocessing.dummy.Pool
(它使用线程池而不是进程池)。两者都试一下,看看哪个更快。
这是给 Python 3.x
我正在以 300 条为单位从 CSV 文件加载记录,然后生成工作线程以将它们提交给 REST API。我将 HTTP 响应保存在一个队列中,这样我就可以在处理完整个 CSV 文件后计算跳过的记录数。但是,在我向工作人员添加队列后,线程似乎不再关闭。我想监控线程数有两个原因:(1) 一旦完成,我可以计算并显示跳过计数,(2) 我想增强我的脚本以生成不超过 20 个左右的线程,所以我不要 运行 内存不足。
我有 2 个问题:
- 有人可以解释为什么在使用
q.put()
时线程保持活动状态吗? - 是否有不同的方法来管理线程数,并监视是否所有线程都已完成?
这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切细节):
import requests, json, csv, time, datetime, multiprocessing
TEST_FILE = 'file.csv'
def read_test_data(path, chunksize=300):
leads = []
with open(path, 'rU') as data:
reader = csv.DictReader(data)
for index, row in enumerate(reader):
if (index % chunksize == 0 and index > 0):
yield leads
del leads[:]
leads.append(row)
yield leads
def worker(leads, q):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
q.put(r.text) # this puts the response in a queue for later analysis
return
if __name__ == "__main__":
q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
jobs = []
for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
p = multiprocessing.Process(target=worker, args=(leads,q,))
jobs.append(p)
p.start()
time.sleep(20) # checking if processes are closing automatically (they don't)
print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0
# The intent is to wait until all workers are done, but it results in an infinite loop
# when I remove 'q.put' in the worker it works fine
#while len(multiprocessing.active_children()) > 0: #
# time.sleep(1)
skipped_count = 0
while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
http_response = json.loads(q.get())
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
这很可能是因为 multiprocessing.Queue
的这个记录怪癖:
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the
cancel_join_thread()
method of the queue to avoid this behaviour.)This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
基本上,您需要确保 get()
来自 Queue
的所有项目,以保证 put
进入 Queue
的所有流程可以退出。
我认为在这种情况下你最好使用 multiprocessing.Pool
,并将你所有的工作提交给 multiprocessing.Pool.map
。这大大简化了事情,并让您完全控制进程数 运行:
def worker(leads):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
return r.text
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count() * 2) # cpu_count() * 2 processes running in the pool
responses = pool.map(worker, read_test_data(TEST_FILE))
skipped_count = 0
for raw_response in responses:
http_response = json.loads(raw_response)
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
如果您担心将 read_test_data(TEST_FILE)
转换为列表(使用 Pool.map
需要)的内存成本,您可以改用 Pool.imap
。
编辑:
正如我在上面的评论中提到的,这个用例看起来像 I/O-bound,这意味着您可以通过使用 multiprocessing.dummy.Pool
(它使用线程池而不是进程池)。两者都试一下,看看哪个更快。