Python - 使用队列时多处理线程不会关闭

Python - multiprocessing threads don't close when using Queue

这是给 Python 3.x

我正在以 300 条为单位从 CSV 文件加载记录,然后生成工作线程以将它们提交给 REST API。我将 HTTP 响应保存在一个队列中,这样我就可以在处理完整个 CSV 文件后计算跳过的记录数。但是,在我向工作人员添加队列后,线程似乎不再关闭。我想监控线程数有两个原因:(1) 一旦完成,我可以计算并显示跳过计数,(2) 我想增强我的脚本以生成不超过 20 个左右的线程,所以我不要 运行 内存不足。

我有 2 个问题:

这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切细节):

import requests, json, csv, time, datetime, multiprocessing

TEST_FILE = 'file.csv'

def read_test_data(path, chunksize=300):
    leads = []
    with open(path, 'rU') as data:
        reader = csv.DictReader(data)
        for index, row in enumerate(reader):
            if (index % chunksize == 0 and index > 0):
                yield leads
                del leads[:]
            leads.append(row)
        yield leads

def worker(leads, q):
    payload = {"action":"createOrUpdate","input":leads}
    r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
    q.put(r.text) # this puts the response in a queue for later analysis
    return

if __name__ == "__main__":
    q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
    jobs = []
    for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
        p = multiprocessing.Process(target=worker, args=(leads,q,))
        jobs.append(p)
        p.start()
    time.sleep(20) # checking if processes are closing automatically (they don't)
    print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0

    # The intent is to wait until all workers are done, but it results in an infinite loop
    # when I remove 'q.put' in the worker it works fine
    #while len(multiprocessing.active_children()) > 0:  # 
    #    time.sleep(1)

    skipped_count = 0
    while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
        http_response = json.loads(q.get())
        for i in http_response['result']:
            if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
                skipped_count += 1
    print("Number of records skipped: " + str(skipped_count))

这很可能是因为 multiprocessing.Queue 的这个记录怪癖:

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

基本上,您需要确保 get() 来自 Queue 的所有项目,以保证 put 进入 Queue 的所有流程可以退出。

我认为在这种情况下你最好使用 multiprocessing.Pool,并将你所有的工作提交给 multiprocessing.Pool.map。这大大简化了事情,并让您完全控制进程数 运行:

def worker(leads):
    payload = {"action":"createOrUpdate","input":leads}
    r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
    return r.text

if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count() * 2)  # cpu_count() * 2 processes running in the pool
    responses = pool.map(worker, read_test_data(TEST_FILE))

    skipped_count = 0
    for raw_response in responses:
        http_response = json.loads(raw_response)
        for i in http_response['result']:
            if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
                skipped_count += 1
    print("Number of records skipped: " + str(skipped_count))

如果您担心将 read_test_data(TEST_FILE) 转换为列表(使用 Pool.map 需要)的内存成本,您可以改用 Pool.imap

编辑:

正如我在上面的评论中提到的,这个用例看起来像 I/O-bound,这意味着您可以通过使用 multiprocessing.dummy.Pool(它使用线程池而不是进程池)。两者都试一下,看看哪个更快。