Python multiprocessing.dummy 线程池在未初始化线程的情况下使用 `map` 运行更多任务
Python multiprocessing.dummy thread pool runs more tasks with `map` without thread initialized
我有以下代码。我认为在执行任务之前初始化器是 运行,但显然我收到错误指示某些任务是 运行 而没有初始化该线程。
import threading
import random
from multiprocessing.dummy import Pool, Value, Queue, Manager
def init_worker():
global thread_local
thread_local = threading.local()
thread_local.worker_idx = random.randint(0, 10)
print("++++++++++++++++++++++++ worker %s" % thread_local.worker_idx)
def run(idx):
print(dir(thread_local))
worker_idx = thread_local.worker_idx
print("==================== TASK ID %s by worker %s ====================" % (idx, worker_idx))
pool = Pool(2, init_worker)
pool.map(run, range(10), chunksize=1)
输出:
++++++++++++++++++++++++ worker 1
++++++++++++++++++++++++ worker 7
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 0 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 2 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
==================== TASK ID 3 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 4 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 5 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 7 by worker 7 ====================
Traceback (most recent call last):
File "test.py", line 19, in <module>
pool.map(run, range(10), chunksize=1)
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "test.py", line 14, in run
worker_idx = thread_local.worker_idx
AttributeError: '_thread._local' object has no attribute 'worker_idx'
所以看起来两个线程都已正确初始化,但更多的任务是在没有预先 运行 初始化程序的情况下启动的。 print(dir(thread_local))
的输出很不一致。
看起来问题出在初始化器上。请注意,没有 TASK ID ... by worker 1
打印输出,即使很明显线程一创建了一个 globalthread 本地对象并分配给了 worker_idx
属性。这是因为 both 线程尝试创建本地线程 global 并且线程 7 覆盖了线程 1 创建的线程本地对象(不是 worker_idx
attr),因此销毁了线程 1 的 worker_idx
。相反,尝试在主线程(调用 map 的线程)中创建全局变量。并且只在线程初始值中分配 worker_idx
。
我有以下代码。我认为在执行任务之前初始化器是 运行,但显然我收到错误指示某些任务是 运行 而没有初始化该线程。
import threading
import random
from multiprocessing.dummy import Pool, Value, Queue, Manager
def init_worker():
global thread_local
thread_local = threading.local()
thread_local.worker_idx = random.randint(0, 10)
print("++++++++++++++++++++++++ worker %s" % thread_local.worker_idx)
def run(idx):
print(dir(thread_local))
worker_idx = thread_local.worker_idx
print("==================== TASK ID %s by worker %s ====================" % (idx, worker_idx))
pool = Pool(2, init_worker)
pool.map(run, range(10), chunksize=1)
输出:
++++++++++++++++++++++++ worker 1
++++++++++++++++++++++++ worker 7
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 0 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 2 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
==================== TASK ID 3 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 4 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 5 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 7 by worker 7 ====================
Traceback (most recent call last):
File "test.py", line 19, in <module>
pool.map(run, range(10), chunksize=1)
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "test.py", line 14, in run
worker_idx = thread_local.worker_idx
AttributeError: '_thread._local' object has no attribute 'worker_idx'
所以看起来两个线程都已正确初始化,但更多的任务是在没有预先 运行 初始化程序的情况下启动的。 print(dir(thread_local))
的输出很不一致。
看起来问题出在初始化器上。请注意,没有 TASK ID ... by worker 1
打印输出,即使很明显线程一创建了一个 globalthread 本地对象并分配给了 worker_idx
属性。这是因为 both 线程尝试创建本地线程 global 并且线程 7 覆盖了线程 1 创建的线程本地对象(不是 worker_idx
attr),因此销毁了线程 1 的 worker_idx
。相反,尝试在主线程(调用 map 的线程)中创建全局变量。并且只在线程初始值中分配 worker_idx
。