在 Python 中使用回调显示异步行为的最小可重现示例

A minimal reproducible example to show asynchronous behavior using callbacks in Python

背景:对我来说似乎很清楚,回调的概念很灵活,但我也认为,它使代码更快。然而,下面的示例有效,但它不能表明使用回调 Ref1:

可以节省时间
import time
from time import sleep

def callbackFunc(delay):
  time.sleep(delay)
  print("callback:     message 3 delay " + str(delay))

def saysomething(delay, callback):
  print("saysomething: message 2 delay " + str(delay))
  callback(delay) # hier muss ich schon wissen, dass nur "delay" benötigt wird...
  time.sleep(2)

if __name__ == '__main__':
  t0 = time.time()
  print("main:         message 1.")
  saysomething(2, callbackFunc)
  print("main:         message 4.")
  print("\ntime: ",time.time() - t0)

输出

main:         message 1.
saysomething: message 2 delay 2
callback:     message 3 delay 2
main:         message 4.
time:  4.01

那么我怎样才能达到这样的效果

main:         message 1.
saysomething: message 2 delay 2
callback:     message 3 delay 2
main:         message 4.
time:  2  !!!!!!!!!!!!!

也许甚至可以调换消息 3 和 4 的顺序?还是我弄错了什么?


也许这些不使用回调但显示异步行为的答案 and here and the following code from here有帮助?

这无关紧要w.r.t。路由库使用的 SWIG 回调。

Re-establishing前提

在我们开始之前,让我们澄清一下您的 post:
回调 不会 使代码本身更快。程序确实提前结束了,因为接受回调的函数没有阻塞.

此外,回调通常在接受回调的函数中同步执行,因此您的示例仍需要 4 秒。一个更合理的起点是:

import time

def callbackFunc(delay):
    # time.sleep(delay)  # -
    print("callback:     message 3 delay " + str(delay))

def saysomething(delay, callback):
    print("saysomething: message 2 delay " + str(delay))
    time.sleep(2)    # +
    callback(delay)
    # time.sleep(2)  # -

if __name__ == '__main__':
    t0 = time.time()
    print("main:         message 1.")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")  # +
    saysomething(2, callbackFunc)              # +
    print("\ntime: ", time.time() - t0, "\n")  # +
    print("main:         message 4.")
    print("\ntime: ", time.time() - t0)

输出:

main:         message 1.
saysomething: message 2 delay 2
callback:     message 3 delay 2

time:  2.00

saysomething: message 2 delay 2
callback:     message 3 delay 2

time:  4.00

main:         message 4.

time:  4.00

为什么示例不起作用

sleep() 暂停调用线程的执行,因此如果您想显示异步行为,则不能在主线程上调用它。

您可以使用线程或事件循环来显示异步行为。

一个使用事件循环的例子

下面是一个使用 built-in 事件循环的例子:

import asyncio  # +
import time

def callbackFunc(delay):
    print("callback:     message 3 delay " + str(delay))

def saysomething(delay, callback):
    print("saysomething: message 2 delay " + str(delay))
    # time.sleep(2)                                          # -
    # callback(delay)                                        # -
    asyncio.get_event_loop().call_later(2, callback, delay)  # +

def wait_for_callbacks():  # +
    def run_until_complete(loop):
        loop.call_soon(lambda: run_until_complete(loop) if loop._scheduled else loop.stop())
    loop = asyncio.get_event_loop()
    run_until_complete(loop)
    loop.run_forever()

if __name__ == '__main__':
    t0 = time.time()
    print("main:         message 1.")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    print("main:         message 4.")
    wait_for_callbacks()  # +
    print("\ntime: ", time.time() - t0)

输出:

main:         message 1.
saysomething: message 2 delay 2

time:  0.000

saysomething: message 2 delay 2

time:  0.000

main:         message 4.
callback:     message 3 delay 2
callback:     message 3 delay 2

time:  2.000

一个使用线程的例子

这是一个使用 built-in OS 线程的示例:

import threading  # +
import time

def threadify(func):  # +
    def _func(*args, **kwargs):
        thread = threading.Thread(target=func, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return _func

def callbackFunc(delay):
    print("callback:     message 3 delay " + str(delay))

@threadify  # +
def saysomething(delay, callback):
    print("saysomething: message 2 delay " + str(delay))
    time.sleep(2)
    callback(delay)

def wait_for_callbacks():  # +
    for thread in threading.enumerate():
        if thread is not threading.current_thread():
            thread.join()

if __name__ == '__main__':
    t0 = time.time()
    print("main:         message 1.")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    print("main:         message 4.")
    wait_for_callbacks()  # +
    print("\ntime: ", time.time() - t0)

输出:(由于 OS 线程的开销,比使用 built-in 事件循环稍慢)

main:         message 1.
saysomething: message 2 delay 2

time:  0.000

saysomething: message 2 delay 2

time:  0.000

main:         message 4.
callback:     message 3 delay 2
callback:     message 3 delay 2

time:  2.00

关于回调是提供“灵活性”的一种方式,你是正确的,但它实际上与“速度”无关。 典型 回调的用例是函数 1 调用函数 2 的情况,函数 2 正在执行某个函数,该函数的完成是异步发生的,因此函数 2 或多或少会立即返回但函数1 仍然需要安排在异步完成发生时发生通知。因此,函数 1 向函数 2 传递一个回调函数,该函数将在完成时使用 agreed-upon 个参数调用。

在你的情况下,你的异步事件是一个时间间隔的到期,你的回调函数传递的是那个延迟时间间隔。现在事实证明 Python 带有一个 sched.scheduler class 允许您通过指定绝对时间值或延迟,将添加到当前时间以计算事件发生的绝对时间 运行。此事件只是一个回调函数,您可以向其指定您想要的任何参数。在我看来,这个 class 的问题在于,您必须首先输入所有您想要 运行ning 的事件,然后调用一个 run 方法,该方法将阻塞直到所有事件都是 运行。更好的方法是只指定一个你想要 运行 的未来事件(即回调)并继续而不阻塞,这个事件将在另一个线程中异步 运行 。因此,我对 sched.scheduler class 进行了大量修改以创建 Scheduler class。您的代码将如下所示:

import time
from scheduler import Scheduler

def callbackFunc(msg_no, delay):
    print(f"callback: message number {msg_no}, delay {delay} at {time.time()}")

def saysomething(msg_no, delay, callback):
    print(f"saysomething: message {msg_no}, delay {delay} at {time.time()}")
    scheduler.enter(delay, callbackFunc, args=(msg_no, delay,))
    time.sleep(2)

if __name__ == '__main__':
    scheduler = Scheduler()
    saysomething(1, 1, callbackFunc)
    saysomething(2, 2, callbackFunc)
    saysomething(3, 3, callbackFunc)

打印:

saysomething: message 1, delay 1 at 1644584120.865646
callback: message number 1, delay 1 at 1644584121.8778687
saysomething: message 2, delay 2 at 1644584122.8747876
saysomething: message 3, delay 3 at 1644584124.8790839
callback: message number 2, delay 2 at 1644584124.8790839
callback: message number 3, delay 3 at 1644584127.9029477

Scheduler class:

"""
Modified sched.schedule class.
"""

import time
import heapq
from collections import namedtuple
import threading


class Event(namedtuple('Event', 'start_time, priority, action, args, kwargs')):
    __slots__ = []
    def __eq__(s, o): return (s.start_time, s.priority) == (o.start_time, o.priority)
    def __lt__(s, o): return (s.start_time, s.priority) <  (o.start_time, o.priority)
    def __le__(s, o): return (s.start_time, s.priority) <= (o.start_time, o.priority)
    def __gt__(s, o): return (s.start_time, s.priority) >  (o.start_time, o.priority)
    def __ge__(s, o): return (s.start_time, s.priority) >= (o.start_time, o.priority)

Event.start_time.__doc__ = ('''Numeric type compatible with the return value from time.monotonic.''')
Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
in the order of their priority.''')
Event.action.__doc__ = ('''Executing the event means executing
action(*args, **kwargs)''')
Event.args.__doc__ = ('''args is a sequence holding the positional
arguments for the action.''')
Event.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
arguments for the action.''')

_sentinel = object()

class Scheduler:
    def __init__(self, daemon=False):
        """
        Initialize a new instance.
        If daemon is True, the scheduler thread will run as a daemon so it will be possible
        for the main thread to terminate with scheduled events yet to run.
        Regardless of how the daemon argument is set, when a new event is added a new
        scheduler thread will be started if the previous thread has terminated.
        """
        self._queue = []
        self._daemon=daemon
        self._running = False
        self._got_event = threading.Condition()
        self._queue_exhausted = threading.Event()
        self._queue_exhausted.set()
        self._thread = None

    def __del__(self):
        if not self._daemon and self._thread:
            self._thread.join()

    def enterabs(self, start_time, action, args=(), kwargs=_sentinel, priority=1):
        """Enter a new event in the queue at an absolute time.

        Returns an ID for the event which can be used to remove it,
        if necessary.

        """
        if kwargs is _sentinel:
            kwargs = {}
        event = Event(start_time, priority, action, args, kwargs)
        with self._got_event:
            heapq.heappush(self._queue, event)
            self._queue_exhausted.clear()
            if not self._running:
                if self._thread:
                    self._thread.join() # tidy up
                self._running = True
                self._thread = threading.Thread(target=self._run, daemon=self._daemon).start()
            else:
                self._got_event.notify()

        return event # The ID

    def enter(self, delay, action, args=(), kwargs=_sentinel, priority=1):
        """A variant that specifies the time as a relative time.

        This is actually the more commonly used interface.

        """
        start_time = time.monotonic() + delay
        return self.enterabs(start_time, action, args, kwargs, priority)

    def cancel(self, event):
        """Remove an event from the queue.

        This must be presented the ID as returned by enter().
        If the event is not in the queue, this raises ValueError.

        """
        with self._got_event:
            self._queue.remove(event)
            heapq.heapify(self._queue)

    def empty(self):
        """Check whether the queue is empty."""
        with self._got_event:
            return not self._queue

    def running(self):
        """Check whether the scheduler is running."""
        with self._got_event:
            return self._running

    def _run(self):
        """Execute events until the queue is empty."""

        # localize variable access to minimize overhead
        # and to improve thread safety
        got_event = self._got_event
        q = self._queue
        delayfunc = time.sleep
        timefunc = time.monotonic
        pop = heapq.heappop
        queue_exhausted = self._queue_exhausted
        while True:
            try:
                while True:
                    with got_event:
                        if not q:
                            self._running = False
                            queue_exhausted.set()
                            return
                        start_time, priority, action, args, kwargs = q[0]
                        now = timefunc()
                        if start_time > now:
                            # Wait for either the time to elapse or a new Event to be added:
                            got_event.wait(timeout=(start_time - now))
                            continue
                        pop(q)
                    action(*args, **kwargs)
                    delayfunc(0)   # Let other threads run
            except:
                pass

    @property
    def queue(self):
        """An ordered list of upcoming events.

        Events are named tuples with fields for:
            start_time, priority, action, argss, kwargs

        """
        # Use heapq to sort the queue rather than using 'sorted(self._queue)'.
        # With heapq, two events scheduled at the same time will show in
        # the actual order they would be retrieved.
        with self._got_event:
            events = self._queue[:]
        return list(map(heapq.heappop, [events]*len(events)))

    def wait_for_queue_empty(self):
        """Wait for the queue to become empty."""
        return self._queue_exhausted.wait()