从 Python 中的 multiprocessing.pool.ThreadPool 获取线程索引
Get thread index from a multiprocessing.pool.ThreadPool in Python
我正在使用一个库,其多处理是使用 multiprocessing.pool.ThreadPool(processes)
实现的。
如何从池中get/compute线程索引(从0
到processes-1
)?
我一直在阅读文档并在网上搜索,但没有找到令人信服的解决方案。我可以获得线程 ID(使用 threading.get_ident()
),并且可以遍历所有线程以构建它们的索引和 ID 之间的映射,但我需要使用某种 time.sleep()
来确保我浏览他们都...你有什么更好的解决方案吗?
想法是创建一个辅助函数,在下面的示例中称为 test_worker
,returns 它的线程标识和调用它的参数,其值为 0 .. . pool size - 1. 然后我们提交任务:
pool.map(test_worker, range(POOLSIZE), 1)
通过将 chunksize
值指定为 1,想法是每个线程将只分配 1 个任务来处理,第一个线程给定参数 0,第二个线程参数 1,等等。我们必须确保test_worker
将处理器的控制权交给池中的其他线程。如果它只包含一个 return
语句,第一个线程可能会结束处理所有任务。本质上,任务被放置在 chunksize
任务列表中的单个队列中,每个池线程取出下一个可用列表并处理列表中的任务,但如果任务如此微不足道,则第一个线程可能实际上可以获取所有列表,因为它永远不会将处理器的控制权交给其他线程。为了避免这种情况,我们在 worker 中插入了对 time.sleep
的调用。
from multiprocessing.pool import ThreadPool
import threading
def test_worker(i):
# To ensure that the worker gives up control of the processor we sleep.
# Otherwise, the same thread may be given all the tasks to process.
time.sleep(.1)
return threading.get_ident(), i
def real_worker(x):
# return the argument squared and the id of the thread that did the work
return x**2, threading.get_ident()
POOLSIZE = 5
with ThreadPool(POOLSIZE) as pool:
# chunksize = 1 is critical to be sure that we have 1 task per thread:
thread_dict = {result[0]: result[1]
for result in pool.map(test_worker, range(POOLSIZE), 1)}
assert(len(thread_dict) == POOLSIZE)
print(thread_dict)
value, id = pool.apply(real_worker, (7,))
print(value) # should be 49
assert (id in thread_dict)
print('thread index = ', thread_dict[id])
打印:
{16880: 0, 16152: 1, 7400: 2, 13320: 3, 168: 4}
49
thread index = 4
没有使用的版本sleep
from multiprocessing.pool import ThreadPool
import threading
import time
def test_worker(i, event):
if event:
event.wait()
return threading.get_ident(), i
def real_worker(x):
return x**2, threading.get_ident()
# Let's use a really big pool size for a good test:
POOLSIZE = 500
events = [threading.Event() for _ in range(POOLSIZE - 1)]
with ThreadPool(POOLSIZE) as pool:
thread_dict = {}
# These first POOLSIZE - 1 tasks will wait until we set their events
results = [pool.apply_async(test_worker, args=(i, event)) for i, event in enumerate(events)]
# This last one is not passed an event and so it does not wait.
# When it completes, we can be sure the other tasks, which have been submitted before it
# have already been picked up by the other threads in the pool.
id, index = pool.apply(test_worker, args=(POOLSIZE - 1, None))
thread_dict[id] = index
# let the others complete:
for event in events:
event.set()
for result in results:
id, index = result.get()
thread_dict[id] = index
assert(len(thread_dict) == POOLSIZE)
value, id = pool.apply(real_worker, (7,))
print(value) # should be 49
assert (id in thread_dict)
print('thread index = ', thread_dict[id])
打印:
49
thread index = 499
我正在使用一个库,其多处理是使用 multiprocessing.pool.ThreadPool(processes)
实现的。
如何从池中get/compute线程索引(从0
到processes-1
)?
我一直在阅读文档并在网上搜索,但没有找到令人信服的解决方案。我可以获得线程 ID(使用 threading.get_ident()
),并且可以遍历所有线程以构建它们的索引和 ID 之间的映射,但我需要使用某种 time.sleep()
来确保我浏览他们都...你有什么更好的解决方案吗?
想法是创建一个辅助函数,在下面的示例中称为 test_worker
,returns 它的线程标识和调用它的参数,其值为 0 .. . pool size - 1. 然后我们提交任务:
pool.map(test_worker, range(POOLSIZE), 1)
通过将 chunksize
值指定为 1,想法是每个线程将只分配 1 个任务来处理,第一个线程给定参数 0,第二个线程参数 1,等等。我们必须确保test_worker
将处理器的控制权交给池中的其他线程。如果它只包含一个 return
语句,第一个线程可能会结束处理所有任务。本质上,任务被放置在 chunksize
任务列表中的单个队列中,每个池线程取出下一个可用列表并处理列表中的任务,但如果任务如此微不足道,则第一个线程可能实际上可以获取所有列表,因为它永远不会将处理器的控制权交给其他线程。为了避免这种情况,我们在 worker 中插入了对 time.sleep
的调用。
from multiprocessing.pool import ThreadPool
import threading
def test_worker(i):
# To ensure that the worker gives up control of the processor we sleep.
# Otherwise, the same thread may be given all the tasks to process.
time.sleep(.1)
return threading.get_ident(), i
def real_worker(x):
# return the argument squared and the id of the thread that did the work
return x**2, threading.get_ident()
POOLSIZE = 5
with ThreadPool(POOLSIZE) as pool:
# chunksize = 1 is critical to be sure that we have 1 task per thread:
thread_dict = {result[0]: result[1]
for result in pool.map(test_worker, range(POOLSIZE), 1)}
assert(len(thread_dict) == POOLSIZE)
print(thread_dict)
value, id = pool.apply(real_worker, (7,))
print(value) # should be 49
assert (id in thread_dict)
print('thread index = ', thread_dict[id])
打印:
{16880: 0, 16152: 1, 7400: 2, 13320: 3, 168: 4}
49
thread index = 4
没有使用的版本sleep
from multiprocessing.pool import ThreadPool
import threading
import time
def test_worker(i, event):
if event:
event.wait()
return threading.get_ident(), i
def real_worker(x):
return x**2, threading.get_ident()
# Let's use a really big pool size for a good test:
POOLSIZE = 500
events = [threading.Event() for _ in range(POOLSIZE - 1)]
with ThreadPool(POOLSIZE) as pool:
thread_dict = {}
# These first POOLSIZE - 1 tasks will wait until we set their events
results = [pool.apply_async(test_worker, args=(i, event)) for i, event in enumerate(events)]
# This last one is not passed an event and so it does not wait.
# When it completes, we can be sure the other tasks, which have been submitted before it
# have already been picked up by the other threads in the pool.
id, index = pool.apply(test_worker, args=(POOLSIZE - 1, None))
thread_dict[id] = index
# let the others complete:
for event in events:
event.set()
for result in results:
id, index = result.get()
thread_dict[id] = index
assert(len(thread_dict) == POOLSIZE)
value, id = pool.apply(real_worker, (7,))
print(value) # should be 49
assert (id in thread_dict)
print('thread index = ', thread_dict[id])
打印:
49
thread index = 499