Python multiprocessing.dummy 线程池在未初始化线程的情况下使用 `map` 运行更多任务

Python multiprocessing.dummy thread pool runs more tasks with `map` without thread initialized

我有以下代码。我认为在执行任务之前初始化器是 运行,但显然我收到错误指示某些任务是 运行 而没有初始化该线程。

import threading
import random
from multiprocessing.dummy import Pool, Value, Queue, Manager

def init_worker():
    global thread_local
    thread_local = threading.local()
    thread_local.worker_idx = random.randint(0, 10)
    print("++++++++++++++++++++++++ worker %s" %  thread_local.worker_idx)


def run(idx):
    print(dir(thread_local))
    worker_idx = thread_local.worker_idx
    print("==================== TASK ID %s by worker %s ====================" % (idx, worker_idx))


pool = Pool(2, init_worker)
pool.map(run, range(10), chunksize=1)

输出:

++++++++++++++++++++++++ worker 1
++++++++++++++++++++++++ worker 7
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 0 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 2 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
==================== TASK ID 3 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 4 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 5 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 7 by worker 7 ====================
Traceback (most recent call last):
  File "test.py", line 19, in <module>
    pool.map(run, range(10), chunksize=1)
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "test.py", line 14, in run
    worker_idx = thread_local.worker_idx
AttributeError: '_thread._local' object has no attribute 'worker_idx'

所以看起来两个线程都已正确初始化,但更多的任务是在没有预先 运行 初始化程序的情况下启动的。 print(dir(thread_local))的输出很不一致。

看起来问题出在初始化器上。请注意,没有 TASK ID ... by worker 1 打印输出,即使很明显线程一创建了一个 globalthread 本地对象并分配给了 worker_idx 属性。这是因为 both 线程尝试创建本地线程 global 并且线程 7 覆盖了线程 1 创建的线程本地对象(不是 worker_idx attr),因此销毁了线程 1 的 worker_idx。相反,尝试在主线程(调用 map 的线程)中创建全局变量。并且只在线程初始值中分配 worker_idx