如何避免将来在不同时间启动(非常短的)动作时启动数百个线程

How to avoid to start hundreds of threads when starting (very short) actions at different timings in the future

我用这个方法在以后的不同时间发起了几十次(不到几千次)do_it的调用:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds

每个 do_it 调用的运行时间非常快(远小于 0.1 毫秒)且无阻塞。我想避免 为这样一个简单的任务生成数百个新线程

对于所有 do_it 调用,我如何使用 只有一个额外的线程 来做到这一点?

有没有使用 Python 的简单方法,无需第三方库,仅使用标准库?

据我了解,您需要一个工作线程来处理提交的任务,而不是按照提交的顺序,而是按照某种优先顺序。这似乎是线程安全的工作 queue.PriorityQueue.

from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
    while True:
        do_it(q.get().item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    for i in range(20):
        q.put(PrioritizedItem(priority=i * 0.010, item=i))
    wait_for_something_else()

此代码假定您希望永远运行。如果没有,可以在thread_worker中的q.get加上超时,在queue.Empty抛出异常时return因为超时过期。这样您就可以在处理完所有作业且超时已过后加入 queue/thread。

如果你想等到未来的某个特定时间运行 任务,它会变得有点复杂。这是一种通过在工作线程中休眠直到指定时间到达来扩展上述方法的方法,但请注意 time.sleep is only as accurate as your OS allows it to be.

from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class TimedItem:
    when: datetime
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[TimedItem]):
    while True:
        when, item = astuple(q.get())
        sleep_time = (when - datetime.now()).total_seconds()
        if sleep_time > 0:
            sleep(sleep_time)
        do_it(item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    now = datetime.now()
    for i in range(20):
        q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
    wait_for_something_else()

为了仅使用一个额外的线程来解决这个问题,我们必须在该线程中休眠,因此在 worker 休眠时,具有更高优先级的新任务可能会进入。在那种情况下,工作人员将在 完成当前任务后 处理新的高优先级任务。上面的代码假设场景不会发生,根据问题描述,这似乎是合理的。如果可能发生这种情况,您可以更改睡眠代码以重复轮询优先级队列前面的任务是否到期。像这样的轮询方法的缺点是它会更加 CPU 密集。

另外,如果你能保证任务提交给worker后,任务的相对顺序不会改变,那么你可以用普通的queue.Queue替换优先级队列来简化有点代码。

这些 do_it 任务可以通过从队列中删除来取消。

以上代码已使用以下模拟定义进行测试:

def do_it(x):
    print(x)

def wait_for_something_else():
    sleep(5)

正如 smcjones 所指出的,另一种不使用额外线程的方法是使用 asyncio。这是一种使用 asyncio 的方法,它在将来的特定时间通过使用 loop.call_later:

调用 do_it
import asyncio


def do_it(x):
    print(x)


async def wait_for_something_else():
    await asyncio.sleep(5)


async def main():
    loop = asyncio.get_event_loop()
    while True:
        for i in range(20):
            loop.call_later(i * 0.010, do_it, i)
        await wait_for_something_else()

asyncio.run(main())

这些 do_it 任务可以使用由 loop.call_later 编辑的句柄 return 取消。

但是,这种方法需要切换您的程序以在整个过程中使用 asyncio,或者 运行在单独的线程中使用 asyncio 事件循环。

正如您所说,在您的代码中,每个系列的 20 个 do_it 调用在 wait_for_something_else 完成时开始,我建议在 join 的每次迭代中调用 join 方法=14=]循环:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds
    for t in timers[-20:]:
        t.join()

我在 Python 中没有太多的线程经验,所以请放轻松。 concurrent.futures 库是 Python3 的一部分,非常简单。我正在为您提供一个示例,以便您了解它是多么简单。

Concurrent.futures 只有一个线程用于 do_it() 和并发性:

import concurrent.futures
import time

def do_it(iteration):
  time.sleep(0.1)
  print('do it counter', iteration)
 
def wait_for_something_else():
    time.sleep(1)
    print('waiting for something else')

def single_thread():
  with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    futures = (executor.submit(do_it, i) for i in range(20))
    for future in concurrent.futures.as_completed(futures):
        future.result()
 
def do_asap():
  wait_for_something_else()

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(single_thread), executor.submit(do_asap)] 
    for future in concurrent.futures.as_completed(futures):
        future.result()

上面的代码使用 max_workers=1 个线程在单个线程中执行 do_it()。在第 13 行,do_it() 使用选项 max_workers=1 将工作限制为一个线程。

第22行,两种方法都提交给concurrent.futures线程池执行器。第 21-24 行的代码在线程池 do_it 运行 的单个非阻塞线程上启用两种方法。

concurrent.futures 文档描述了如何控制线程数。当不指定max_workers时,分配给两个进程的线程总数为max_workers = min(32, os.cpu_count() + 4).

听起来你想要的东西是非阻塞和异步的,但也是单处理和单线程的(一个线程专用于 do_it)。

如果是这种情况,尤其是涉及到任何网络时,只要您没有在主线程上积极地进行认真的 I/O,使用 asyncio 可能是值得的.

它旨在处理非阻塞操作,并允许您发出所有请求而无需等待响应。

示例:

import asyncio


def main():
    while True:
        tasks = []
        for i in range(20):
            tasks.append(asyncio.create_task(do_it(i)))  
        await wait_for_something_else()
        for task in tasks:
            await task

asyncio.run(main())

考虑到阻塞所花费的时间 I/O(秒)- 您可能会浪费更多的时间来管理线程,而不是生成单独的线程来执行这些其他操作所节省的时间。

do_it 运行顺序可取消

运行 所有 do_it 在一个线程中并在特定时间休眠(可能不与睡眠一起)

使用变量“should_run_it”检查do_it是否应该运行(可取消?)

是这样的吗?

import threading
import time

def do_it(i):
    print(f"[{i}] {time.time()}")

should_run_it = {i:True for i in range(20)}

def guard_do_it(i):
    if should_run_it[i]:
        do_it(i)

def run_do_it():
    for i in range(20):
        guard_do_it(i)
        time.sleep(0.010)

if __name__ == "__main__":
    t = threading.Timer(0.010, run_do_it)
    start = time.time()
    print(start)
    t.start()
    #should_run_it[5] = should_run_it[10] = should_run_it[15] = False # test
    t.join()
    end = time.time()
    print(end)
    print(end - start)