如何 return 线程的第一个参数以及线程的 return 来自队列?
How to return thread's first argument along with thread's return from queue?
下面我有 Threader class,我用它来线程化任意数量的函数,然后是 return 加入线程后的线程函数列表 return .我想要的一个功能是 return 字典而不是列表的选项。我找到了一种方法,它要求线程函数 return 一个元组。然后元组的第一个值将用作键。我想改为拥有它,以便将线程函数的第一个参数用作键。
我了解到线程可以命名,所以我将名称设置为线程函数创建线程时的第一个参数。线程本身可以使用 getName() 访问名称,但是我如何从队列中获取下一个线程的名称 .get() ? (如何访问队列中的线程对象?)
我只需要它按照第一段中的描述工作,所以我愿意接受实现相同效果的其他方法。
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
thread_queue (Queue): The queue that holds the threads.
threads (Thread list): Threads of functions added with add_thread.
"""
def __init__(self):
self.thread_queue = Queue()
self.threads = []
def add_thread(self, func, args):
"""add a function to be threaded"""
self.threads.append(Thread(
name=args[0], # Custom name using function's first argument
target=lambda queue, func_args: queue.put(func(*func_args)),
args=(self.thread_queue, args)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block threads until all are done, then return their results
Args:
return_dict (bool): Return a dict instead of a list. Requires
each thread to return a tuple with two values.
"""
for thread in self.threads:
thread.join()
if return_dict:
results = {}
while not self.thread_queue.empty():
# Setting the dictionary key with returned tuple
# How to access thread's name?
key, value = self.thread_queue.get()
results[key] = value
else:
results = []
while not self.thread_queue.empty():
results.append(self.thread_queue.get())
return results
用法示例:
threader = Threader()
for region in regions:
# probe_region is a function, and (region, tag_filter) are args for it
threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()
编辑:我目前使用的:
我清理和改进的 版本(return 按线程插入排序):
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
result_queue (Queue): Thread-safe queue that holds the results.
threads (list[Thread]): Threads of functions added with add_thread.
"""
def __init__(self):
self.result_queue = Queue()
self.threads = []
def worker(self, func, fargs):
"""insert threaded function into queue to make its return retrievable
The index of the thread and the threaded function's first arg are
inserted into the queue, preceding the threaded function itself.
Args: See add_thread
"""
return self.result_queue.put([
len(self.threads), fargs[0], func(*fargs)])
def add_thread(self, func, fargs):
"""add a function to be threaded
Args:
func (function): Function to thread.
fargs (tuple): Argument(s) to pass to the func function.
Raises:
ValueError: If func isn't callable, or if fargs not a tuple.
"""
if not callable(func):
raise ValueError("func must be a function.")
if not isinstance(fargs, tuple) or not fargs:
raise ValueError("fargs must be a non-empty tuple.")
self.threads.append(Thread(target=self.worker, args=(func, fargs)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block all threads, sort by thread index, then return thread results
Args:
return_dict (bool): Return dict instead of list. Threads'
function's first argument used as key.
"""
for thread in self.threads:
thread.join()
thread_data = []
while not self.result_queue.empty():
thread_data.append(self.result_queue.get())
thread_data.sort(key=lambda thread_index: thread_index[0])
if return_dict:
results = {}
for _, key, thread_return in thread_data:
results[key] = thread_return
else:
results = []
for _, _, thread_return in thread_data:
results.append(thread_return)
return results
如果您只想实现第一段中概述的结果,其中您使用第一个参数作为键,您可以修改代码,如下所示:
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
queue (Queue): The thread-safe queue that holds the results.
threads (Thread list): Threads of functions added with add_thread.
"""
def __init__(self):
self.results_queue = Queue()
self.threads = []
def worker(self, func, args):
"""run the function and save its results"""
result = func(*args)
# save result, along with a key for use later if needed (first argument)
self.results_queue.put([args[0], result])
def add_thread(self, func, fargs):
"""add a function to be threaded"""
self.threads.append(Thread(target = self.worker, args = (func, fargs)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block threads until all are done, then return their results
Args:
return_dict (bool): Return a dict instead of a list. Requires
each thread to return a tuple with two values.
"""
for thread in self.threads:
thread.join()
if return_dict:
results = {}
while not self.results_queue.empty():
# set the dictionary key as first argument passed to worker
key, value = self.results_queue.get()
results[key] = value
else:
results = []
while not self.results_queue.empty():
# set the dictionary key as first argument passed to worker
key, value = self.results_queue.get()
results.append(value)
return results
注意,不必将线程本身存储在队列中,只需存储结果即可。 (队列是存储结果的不错选择,因为它处理访问同步问题)。
在worker()
函数中,您可以随意生成密钥;在上面的代码中,我按照您的建议使用了第一个参数。
一个用法示例是:
def foo(*args):
return "foo() " + repr(len(args))
def bar(*args):
return "bar() " + repr(len(args))
def baz(*args):
return "baz() " + repr(len(args))
threader = Threader()
threader.add_thread(foo, ["foo_key", "a"])
threader.add_thread(bar, ["bar_key", "b", "c"])
threader.add_thread(baz, ["baz_key", "d", "e", "f"])
print (threader.get_results(True))
这给出了输出:
{'foo_key': 'foo() 2', 'bar_key': 'bar() 3', 'baz_key': 'baz() 4'}
希望这可能有所帮助。
下面我有 Threader class,我用它来线程化任意数量的函数,然后是 return 加入线程后的线程函数列表 return .我想要的一个功能是 return 字典而不是列表的选项。我找到了一种方法,它要求线程函数 return 一个元组。然后元组的第一个值将用作键。我想改为拥有它,以便将线程函数的第一个参数用作键。
我了解到线程可以命名,所以我将名称设置为线程函数创建线程时的第一个参数。线程本身可以使用 getName() 访问名称,但是我如何从队列中获取下一个线程的名称 .get() ? (如何访问队列中的线程对象?)
我只需要它按照第一段中的描述工作,所以我愿意接受实现相同效果的其他方法。
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
thread_queue (Queue): The queue that holds the threads.
threads (Thread list): Threads of functions added with add_thread.
"""
def __init__(self):
self.thread_queue = Queue()
self.threads = []
def add_thread(self, func, args):
"""add a function to be threaded"""
self.threads.append(Thread(
name=args[0], # Custom name using function's first argument
target=lambda queue, func_args: queue.put(func(*func_args)),
args=(self.thread_queue, args)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block threads until all are done, then return their results
Args:
return_dict (bool): Return a dict instead of a list. Requires
each thread to return a tuple with two values.
"""
for thread in self.threads:
thread.join()
if return_dict:
results = {}
while not self.thread_queue.empty():
# Setting the dictionary key with returned tuple
# How to access thread's name?
key, value = self.thread_queue.get()
results[key] = value
else:
results = []
while not self.thread_queue.empty():
results.append(self.thread_queue.get())
return results
用法示例:
threader = Threader()
for region in regions:
# probe_region is a function, and (region, tag_filter) are args for it
threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()
编辑:我目前使用的:
我清理和改进的
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
result_queue (Queue): Thread-safe queue that holds the results.
threads (list[Thread]): Threads of functions added with add_thread.
"""
def __init__(self):
self.result_queue = Queue()
self.threads = []
def worker(self, func, fargs):
"""insert threaded function into queue to make its return retrievable
The index of the thread and the threaded function's first arg are
inserted into the queue, preceding the threaded function itself.
Args: See add_thread
"""
return self.result_queue.put([
len(self.threads), fargs[0], func(*fargs)])
def add_thread(self, func, fargs):
"""add a function to be threaded
Args:
func (function): Function to thread.
fargs (tuple): Argument(s) to pass to the func function.
Raises:
ValueError: If func isn't callable, or if fargs not a tuple.
"""
if not callable(func):
raise ValueError("func must be a function.")
if not isinstance(fargs, tuple) or not fargs:
raise ValueError("fargs must be a non-empty tuple.")
self.threads.append(Thread(target=self.worker, args=(func, fargs)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block all threads, sort by thread index, then return thread results
Args:
return_dict (bool): Return dict instead of list. Threads'
function's first argument used as key.
"""
for thread in self.threads:
thread.join()
thread_data = []
while not self.result_queue.empty():
thread_data.append(self.result_queue.get())
thread_data.sort(key=lambda thread_index: thread_index[0])
if return_dict:
results = {}
for _, key, thread_return in thread_data:
results[key] = thread_return
else:
results = []
for _, _, thread_return in thread_data:
results.append(thread_return)
return results
如果您只想实现第一段中概述的结果,其中您使用第一个参数作为键,您可以修改代码,如下所示:
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
queue (Queue): The thread-safe queue that holds the results.
threads (Thread list): Threads of functions added with add_thread.
"""
def __init__(self):
self.results_queue = Queue()
self.threads = []
def worker(self, func, args):
"""run the function and save its results"""
result = func(*args)
# save result, along with a key for use later if needed (first argument)
self.results_queue.put([args[0], result])
def add_thread(self, func, fargs):
"""add a function to be threaded"""
self.threads.append(Thread(target = self.worker, args = (func, fargs)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block threads until all are done, then return their results
Args:
return_dict (bool): Return a dict instead of a list. Requires
each thread to return a tuple with two values.
"""
for thread in self.threads:
thread.join()
if return_dict:
results = {}
while not self.results_queue.empty():
# set the dictionary key as first argument passed to worker
key, value = self.results_queue.get()
results[key] = value
else:
results = []
while not self.results_queue.empty():
# set the dictionary key as first argument passed to worker
key, value = self.results_queue.get()
results.append(value)
return results
注意,不必将线程本身存储在队列中,只需存储结果即可。 (队列是存储结果的不错选择,因为它处理访问同步问题)。
在worker()
函数中,您可以随意生成密钥;在上面的代码中,我按照您的建议使用了第一个参数。
一个用法示例是:
def foo(*args):
return "foo() " + repr(len(args))
def bar(*args):
return "bar() " + repr(len(args))
def baz(*args):
return "baz() " + repr(len(args))
threader = Threader()
threader.add_thread(foo, ["foo_key", "a"])
threader.add_thread(bar, ["bar_key", "b", "c"])
threader.add_thread(baz, ["baz_key", "d", "e", "f"])
print (threader.get_results(True))
这给出了输出:
{'foo_key': 'foo() 2', 'bar_key': 'bar() 3', 'baz_key': 'baz() 4'}
希望这可能有所帮助。