在 python 中使用线程处理任务时,有没有办法 change/add 动态排队内容

Is there a way to change/add queue contents dynamically while processing tasks using threading in python

我是多线程的新手,但了解到这对我的用例非常有用。我有一个 运行 的初始任务队列,下面的程序方法将帮助我。

from Queue import Queue
from threading import Thread

def do_stuff(q):
  while True:
    print q.get()
    q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
  worker = Thread(target=do_stuff, args=(q,))
  worker.setDaemon(True)
  worker.start()

for x in range(100):
  q.put(x)

q.join()

我已经研究了很多关于我们是否可以 change/add 队列中的任务但没有得到任何信息。我的流程最初有一些任务,一旦这些任务完成,就会有 运行 的任务(依赖关系 - 这跨越近数千个任务)。因此,我想根据先前任务的 success/failure 并发线程数上限来继续向队列中添加任务。

更新

根据您的所有评论,现在看来您拥有的是创建 10 个依赖链的 10 个独立值集:

Chain 1: [1, 11, 21, 31, ...]
Chain 2: [2, 12, 22, 32, ...]
...
Chain 10: [10, 20, 30, 40, ...]

您可以 运行 每个链的第一个值作为线程池中的并发任务(即 1、2、... 10),如果任务成功完成,那么您可以 运行 链中的下一个值,否则您将完成该链,因为链中的每个连续值仅在前一个值成功完成时 运行。

一旦您想出了表达这些依赖链的方法,这就变得非常简单:

from multiprocessing.pool import ThreadPool as Pool

def process_x_value(x):
    """
    Process current x value.
    Note that this is invoked by a simple call from run_dependency_chain,
    which is already threaded.
    This function must not be CPU-intensive or else you will not achieve any
    level of concurrency using multithreading.
    """
    import time
    time.sleep(.1) # simulate some I/O
    # return success or failure
    return True # success

def run_dependency_chain(x):
    """
    Process value x, if sucessful process next x value that was dependent
    on successful completion.
    Repeat until there is no next x value (end of dependency chain).
    """
    while True:
        result = process_x_value(x)
        if not result: # failure
            return
        results[x] = True # just store successful results
        x = next_x.get(x)
        if x is None:
            return


# we will be running 10 concurrent dependency chains:
# if task 1 completes successfully, next task to run is 11
# if task 2 completes successfully, next task to run is 12
# ...
# if task 10 completes successfully, next task to run is 20
"""
Thus the successor task can be computed by adding 10 to the current task,
but we will assume in general a more complicated relationship is possible. So we will
use a quasi-linked list of dependencies implemented using a dictionary, next_x,
where next_x[x] gives the successor x to be run on successful completion
of task x.
"""
# at most 2000 successful tasks:
next_x = {x: x + 10 for x in range(1, 1991)}

# to hold results, if you are interested:
results = {}
pool = Pool(10)
pool.map(run_dependency_chain, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(len(results)) # number of succesful results

打印:

2000

如果 process_x_value 足以 I/O 绑定,多线程应该可以将您的 运行 宁时间减少近 10 倍。