如何在 multiprocessing.dummy.Pool 中单独命名线程?

How to name threads individually within a multiprocessing.dummy.Pool?

我想在 multiprocessing.dummy.Pool 中命名线程,以便在从主线程调用 threading.enumerate() 时可以查看它们的所有名称。当我调用 pool.apply_async 以命名线程时,是否可以应用关键字?我更愿意在创建时命名它们,而不是在 tester 函数中命名,只是为了清洁。

例如,如果我有下面的示例代码:

import multiprocessing.dummy
from time import sleep
import threading

def tester():
    sleep(2)
    print("running \n")

def run_conc(number_of_threads, fxn):
    pool = multiprocessing.dummy.Pool(processes=number_of_threads)
    for thread in range(number_of_threads):
        pool.apply_async(tester)
    print(threading.enumerate(), "\n")
    pool.close()
    pool.join()

run_conc(3, tester)

当我 运行 它时,我收到输出:

[<_MainThread(MainThread, started 140735632434048)>, <Thread(SockThread, started daemon 123145521917952)>, <DummyProcess(Thread-1, started daemon 123145527246848)>, <DummyProcess(Thread-2, started daemon 123145532502016)>, <DummyProcess(Thread-3, started daemon 123145537757184)>, <Thread(Thread-4, started daemon 123145543012352)>, <Thread(Thread-5, started daemon 123145548267520)>, <Thread(Thread-6, started daemon 123145553522688)>] 

running 
running 
running 

我希望能够命名该线程列表中的 3 个虚拟线程,以便我可以识别哪个是哪个。或者也许有一种方法可以在 concurrent.futures 中执行此操作,我应该改用它?

如果您可以编辑话题名称,那么以下代码段会有所帮助

from multiprocessing import Pool,Queue
import threading

thread_names = Queue()
num_process = 4
for e in ['A','B','C','D']:
    thread_names.put('Thread-{}'.format(e))

def initializer(q):
    thread_name = q.get()
    threading.current_thread().name = thread_name

if __name__ == '__main__':
    pool = Pool(num_process=4,initializer=initializer,initargs=(threadnames,))

好的,我发现我可以在 tester 函数中通过给 threading.current_thread().name 赋值来更改线程名称。但是,如果有人知道如何在创建时从 pool.apply_async 行设置线程名称,那么将不胜感激。

池中的工作线程 不是 在您调用 pool.apply_async 或其他池方法时创建的,而是在您实例化池之前创建的。调用池方法使用池中的现有线程。

如果不操作源代码,就无法在初始化时命名线程。您的选择是:

  • 重命名工作线程池准备好实例化后
  • monkey-patching 池内部结构以强制执行特定的命名模板

第一个选项实现起来很简单,您只需遍历池实例的 ._pool 属性并更改包含线程的 .name

from multiprocessing.pool import ThreadPool as Pool


if __name__ == '__main__':

    pool = Pool(4)
    print([w.name for w in pool._pool])
    # ['Thread-1', 'Thread-2', 'Thread-3', 'Thread-4']

    for w in pool._pool:
        w.name = w.name.replace('Thread', 'ThreadPoolWorker')

    print([w.name for w in pool._pool])
    # ['ThreadPoolWorker-1', 'ThreadPoolWorker-2', 'ThreadPoolWorker-3', 'ThreadPoolWorker-4']

    pool.close()
    pool.join()

注意我在这里使用 multiprocessing.pool.ThreadPool,只是为了符合下面第二个选项中的示例,因为 multiprocessing.dummy.Pool 只是 ThreadPool.

的包装器

对于第二个选项,可以使用包装器为工作线程 ThreadPool.Process 修补工厂函数,扩展默认名称 'Thread-%d'(%d 填充值计数器的名称)具有更有意义的名称,例如 'ThreadPoolWorker'.

# threadpool.py
# Module patching the name of worker-threads within ThreadPool

__all__ = ['ThreadPool']

from functools import wraps
from multiprocessing.pool import ThreadPool


def rename_worker(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        w = fn(*args, **kwargs)
        w.name = w.name.replace('Thread', 'ThreadPoolWorker')
        return w
    return wrapper


ThreadPool.Process = staticmethod(rename_worker(ThreadPool.Process))

用法:

from threadpool import ThreadPool as Pool


if __name__ == '__main__':

    pool = Pool(4)
    print([w.name for w in pool._pool])
    # ['ThreadPoolWorker-1', 'ThreadPoolWorker-2', 'ThreadPoolWorker-3', 'ThreadPoolWorker-4']
    pool.close()
    pool.join()