从 Python 中的 multiprocessing.pool.ThreadPool 获取线程索引

Get thread index from a multiprocessing.pool.ThreadPool in Python

我正在使用一个库,其多处理是使用 multiprocessing.pool.ThreadPool(processes) 实现的。

如何从池中get/compute线程索引(从0processes-1)?

我一直在阅读文档并在网上搜索,但没有找到令人信服的解决方案。我可以获得线程 ID(使用 threading.get_ident()),并且可以遍历所有线程以构建它们的索引和 ID 之间的映射,但我需要使用某种 time.sleep() 来确保我浏览他们都...你有什么更好的解决方案吗?

想法是创建一个辅助函数,在下面的示例中称为 test_worker,returns 它的线程标识和调用它的参数,其值为 0 .. . pool size - 1. 然后我们提交任务:

pool.map(test_worker, range(POOLSIZE), 1)

通过将 chunksize 值指定为 1,想法是每个线程将只分配 1 个任务来处理,第一个线程给定参数 0,第二个线程参数 1,等等。我们必须确保test_worker 将处理器的控制权交给池中的其他线程。如果它只包含一个 return 语句,第一个线程可能会结束处理所有任务。本质上,任务被放置在 chunksize 任务列表中的单个队列中,每个池线程取出下一个可用列表并处理列表中的任务,但如果任务如此微不足道,则第一个线程可能实际上可以获取所有列表,因为它永远不会将处理器的控制权交给其他线程。为了避免这种情况,我们在 worker 中插入了对 time.sleep 的调用。

from multiprocessing.pool import ThreadPool
import threading


def test_worker(i):
    # To ensure that the worker gives up control of the processor we sleep.
    # Otherwise, the same thread may be given all the tasks to process.
    time.sleep(.1)
    return threading.get_ident(), i

def real_worker(x):
    # return the argument squared and the id of the thread that did the work
    return x**2, threading.get_ident()

POOLSIZE = 5
with ThreadPool(POOLSIZE) as pool:
    # chunksize = 1 is critical to be sure that we have 1 task per thread:
    thread_dict = {result[0]: result[1]
                   for result in pool.map(test_worker, range(POOLSIZE), 1)}
    assert(len(thread_dict) == POOLSIZE)
    print(thread_dict)
    value, id = pool.apply(real_worker, (7,))
    print(value) # should be 49
    assert (id in thread_dict)
    print('thread index = ', thread_dict[id])

打印:

{16880: 0, 16152: 1, 7400: 2, 13320: 3, 168: 4}
49
thread index =  4

没有使用的版本sleep

from multiprocessing.pool import ThreadPool
import threading
import time

def test_worker(i, event):
    if event:
        event.wait()
    return threading.get_ident(), i

def real_worker(x):
    return x**2, threading.get_ident()


# Let's use a really big pool size for a good test:
POOLSIZE = 500
events = [threading.Event() for _ in range(POOLSIZE - 1)]
with ThreadPool(POOLSIZE) as pool:
    thread_dict = {}
    # These first POOLSIZE - 1 tasks will wait until we set their events
    results = [pool.apply_async(test_worker, args=(i, event)) for i, event in enumerate(events)]
    # This last one is not passed an event and so it does not wait.
    # When it completes, we can be sure the other tasks, which have been submitted before it
    # have already been picked up by the other threads in the pool.
    id, index = pool.apply(test_worker, args=(POOLSIZE - 1, None))
    thread_dict[id] = index
    # let the others complete:
    for event in events:
        event.set()
    for result in results:
        id, index = result.get()
        thread_dict[id] = index
    assert(len(thread_dict) == POOLSIZE)
    value, id = pool.apply(real_worker, (7,))
    print(value) # should be 49
    assert (id in thread_dict)
    print('thread index = ', thread_dict[id])

打印:

49
thread index =  499