在 Python 中使用回调显示异步行为的最小可重现示例
A minimal reproducible example to show asynchronous behavior using callbacks in Python
背景:对我来说似乎很清楚,回调的概念很灵活,但我也认为,它使代码更快。然而,下面的示例有效,但它不能表明使用回调 Ref1:
可以节省时间
import time
from time import sleep
def callbackFunc(delay):
time.sleep(delay)
print("callback: message 3 delay " + str(delay))
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
callback(delay) # hier muss ich schon wissen, dass nur "delay" benötigt wird...
time.sleep(2)
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("main: message 4.")
print("\ntime: ",time.time() - t0)
输出
main: message 1.
saysomething: message 2 delay 2
callback: message 3 delay 2
main: message 4.
time: 4.01
那么我怎样才能达到这样的效果
main: message 1.
saysomething: message 2 delay 2
callback: message 3 delay 2
main: message 4.
time: 2 !!!!!!!!!!!!!
也许甚至可以调换消息 3 和 4 的顺序?还是我弄错了什么?
也许这些不使用回调但显示异步行为的答案 and here and the following code from here有帮助?
这无关紧要w.r.t。路由库使用的 SWIG 回调。
Re-establishing前提
在我们开始之前,让我们澄清一下您的 post:
回调 不会 使代码本身更快。程序确实提前结束了,因为接受回调的函数没有阻塞.
此外,回调通常在接受回调的函数中同步执行,因此您的示例仍需要 4 秒。一个更合理的起点是:
import time
def callbackFunc(delay):
# time.sleep(delay) # -
print("callback: message 3 delay " + str(delay))
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
time.sleep(2) # +
callback(delay)
# time.sleep(2) # -
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n") # +
saysomething(2, callbackFunc) # +
print("\ntime: ", time.time() - t0, "\n") # +
print("main: message 4.")
print("\ntime: ", time.time() - t0)
输出:
main: message 1.
saysomething: message 2 delay 2
callback: message 3 delay 2
time: 2.00
saysomething: message 2 delay 2
callback: message 3 delay 2
time: 4.00
main: message 4.
time: 4.00
为什么示例不起作用
sleep()
暂停调用线程的执行,因此如果您想显示异步行为,则不能在主线程上调用它。
您可以使用线程或事件循环来显示异步行为。
一个使用事件循环的例子
下面是一个使用 built-in 事件循环的例子:
import asyncio # +
import time
def callbackFunc(delay):
print("callback: message 3 delay " + str(delay))
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
# time.sleep(2) # -
# callback(delay) # -
asyncio.get_event_loop().call_later(2, callback, delay) # +
def wait_for_callbacks(): # +
def run_until_complete(loop):
loop.call_soon(lambda: run_until_complete(loop) if loop._scheduled else loop.stop())
loop = asyncio.get_event_loop()
run_until_complete(loop)
loop.run_forever()
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
print("main: message 4.")
wait_for_callbacks() # +
print("\ntime: ", time.time() - t0)
输出:
main: message 1.
saysomething: message 2 delay 2
time: 0.000
saysomething: message 2 delay 2
time: 0.000
main: message 4.
callback: message 3 delay 2
callback: message 3 delay 2
time: 2.000
一个使用线程的例子
这是一个使用 built-in OS 线程的示例:
import threading # +
import time
def threadify(func): # +
def _func(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return thread
return _func
def callbackFunc(delay):
print("callback: message 3 delay " + str(delay))
@threadify # +
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
time.sleep(2)
callback(delay)
def wait_for_callbacks(): # +
for thread in threading.enumerate():
if thread is not threading.current_thread():
thread.join()
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
print("main: message 4.")
wait_for_callbacks() # +
print("\ntime: ", time.time() - t0)
输出:(由于 OS 线程的开销,比使用 built-in 事件循环稍慢)
main: message 1.
saysomething: message 2 delay 2
time: 0.000
saysomething: message 2 delay 2
time: 0.000
main: message 4.
callback: message 3 delay 2
callback: message 3 delay 2
time: 2.00
关于回调是提供“灵活性”的一种方式,你是正确的,但它实际上与“速度”无关。 典型 回调的用例是函数 1 调用函数 2 的情况,函数 2 正在执行某个函数,该函数的完成是异步发生的,因此函数 2 或多或少会立即返回但函数1 仍然需要安排在异步完成发生时发生通知。因此,函数 1 向函数 2 传递一个回调函数,该函数将在完成时使用 agreed-upon 个参数调用。
在你的情况下,你的异步事件是一个时间间隔的到期,你的回调函数传递的是那个延迟时间间隔。现在事实证明 Python 带有一个 sched.scheduler
class 允许您通过指定绝对时间值或延迟,将添加到当前时间以计算事件发生的绝对时间 运行。此事件只是一个回调函数,您可以向其指定您想要的任何参数。在我看来,这个 class 的问题在于,您必须首先输入所有您想要 运行ning 的事件,然后调用一个 run
方法,该方法将阻塞直到所有事件都是 运行。更好的方法是只指定一个你想要 运行 的未来事件(即回调)并继续而不阻塞,这个事件将在另一个线程中异步 运行 。因此,我对 sched.scheduler
class 进行了大量修改以创建 Scheduler
class。您的代码将如下所示:
import time
from scheduler import Scheduler
def callbackFunc(msg_no, delay):
print(f"callback: message number {msg_no}, delay {delay} at {time.time()}")
def saysomething(msg_no, delay, callback):
print(f"saysomething: message {msg_no}, delay {delay} at {time.time()}")
scheduler.enter(delay, callbackFunc, args=(msg_no, delay,))
time.sleep(2)
if __name__ == '__main__':
scheduler = Scheduler()
saysomething(1, 1, callbackFunc)
saysomething(2, 2, callbackFunc)
saysomething(3, 3, callbackFunc)
打印:
saysomething: message 1, delay 1 at 1644584120.865646
callback: message number 1, delay 1 at 1644584121.8778687
saysomething: message 2, delay 2 at 1644584122.8747876
saysomething: message 3, delay 3 at 1644584124.8790839
callback: message number 2, delay 2 at 1644584124.8790839
callback: message number 3, delay 3 at 1644584127.9029477
和 Scheduler
class:
"""
Modified sched.schedule class.
"""
import time
import heapq
from collections import namedtuple
import threading
class Event(namedtuple('Event', 'start_time, priority, action, args, kwargs')):
__slots__ = []
def __eq__(s, o): return (s.start_time, s.priority) == (o.start_time, o.priority)
def __lt__(s, o): return (s.start_time, s.priority) < (o.start_time, o.priority)
def __le__(s, o): return (s.start_time, s.priority) <= (o.start_time, o.priority)
def __gt__(s, o): return (s.start_time, s.priority) > (o.start_time, o.priority)
def __ge__(s, o): return (s.start_time, s.priority) >= (o.start_time, o.priority)
Event.start_time.__doc__ = ('''Numeric type compatible with the return value from time.monotonic.''')
Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
in the order of their priority.''')
Event.action.__doc__ = ('''Executing the event means executing
action(*args, **kwargs)''')
Event.args.__doc__ = ('''args is a sequence holding the positional
arguments for the action.''')
Event.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
arguments for the action.''')
_sentinel = object()
class Scheduler:
def __init__(self, daemon=False):
"""
Initialize a new instance.
If daemon is True, the scheduler thread will run as a daemon so it will be possible
for the main thread to terminate with scheduled events yet to run.
Regardless of how the daemon argument is set, when a new event is added a new
scheduler thread will be started if the previous thread has terminated.
"""
self._queue = []
self._daemon=daemon
self._running = False
self._got_event = threading.Condition()
self._queue_exhausted = threading.Event()
self._queue_exhausted.set()
self._thread = None
def __del__(self):
if not self._daemon and self._thread:
self._thread.join()
def enterabs(self, start_time, action, args=(), kwargs=_sentinel, priority=1):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
if kwargs is _sentinel:
kwargs = {}
event = Event(start_time, priority, action, args, kwargs)
with self._got_event:
heapq.heappush(self._queue, event)
self._queue_exhausted.clear()
if not self._running:
if self._thread:
self._thread.join() # tidy up
self._running = True
self._thread = threading.Thread(target=self._run, daemon=self._daemon).start()
else:
self._got_event.notify()
return event # The ID
def enter(self, delay, action, args=(), kwargs=_sentinel, priority=1):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
start_time = time.monotonic() + delay
return self.enterabs(start_time, action, args, kwargs, priority)
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
If the event is not in the queue, this raises ValueError.
"""
with self._got_event:
self._queue.remove(event)
heapq.heapify(self._queue)
def empty(self):
"""Check whether the queue is empty."""
with self._got_event:
return not self._queue
def running(self):
"""Check whether the scheduler is running."""
with self._got_event:
return self._running
def _run(self):
"""Execute events until the queue is empty."""
# localize variable access to minimize overhead
# and to improve thread safety
got_event = self._got_event
q = self._queue
delayfunc = time.sleep
timefunc = time.monotonic
pop = heapq.heappop
queue_exhausted = self._queue_exhausted
while True:
try:
while True:
with got_event:
if not q:
self._running = False
queue_exhausted.set()
return
start_time, priority, action, args, kwargs = q[0]
now = timefunc()
if start_time > now:
# Wait for either the time to elapse or a new Event to be added:
got_event.wait(timeout=(start_time - now))
continue
pop(q)
action(*args, **kwargs)
delayfunc(0) # Let other threads run
except:
pass
@property
def queue(self):
"""An ordered list of upcoming events.
Events are named tuples with fields for:
start_time, priority, action, argss, kwargs
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
with self._got_event:
events = self._queue[:]
return list(map(heapq.heappop, [events]*len(events)))
def wait_for_queue_empty(self):
"""Wait for the queue to become empty."""
return self._queue_exhausted.wait()
背景:对我来说似乎很清楚,回调的概念很灵活,但我也认为,它使代码更快。然而,下面的示例有效,但它不能表明使用回调 Ref1:
可以节省时间import time
from time import sleep
def callbackFunc(delay):
time.sleep(delay)
print("callback: message 3 delay " + str(delay))
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
callback(delay) # hier muss ich schon wissen, dass nur "delay" benötigt wird...
time.sleep(2)
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("main: message 4.")
print("\ntime: ",time.time() - t0)
输出
main: message 1.
saysomething: message 2 delay 2
callback: message 3 delay 2
main: message 4.
time: 4.01
那么我怎样才能达到这样的效果
main: message 1.
saysomething: message 2 delay 2
callback: message 3 delay 2
main: message 4.
time: 2 !!!!!!!!!!!!!
也许甚至可以调换消息 3 和 4 的顺序?还是我弄错了什么?
也许这些不使用回调但显示异步行为的答案
这无关紧要w.r.t。路由库使用的 SWIG 回调。
Re-establishing前提
在我们开始之前,让我们澄清一下您的 post:
回调 不会 使代码本身更快。程序确实提前结束了,因为接受回调的函数没有阻塞.
此外,回调通常在接受回调的函数中同步执行,因此您的示例仍需要 4 秒。一个更合理的起点是:
import time
def callbackFunc(delay):
# time.sleep(delay) # -
print("callback: message 3 delay " + str(delay))
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
time.sleep(2) # +
callback(delay)
# time.sleep(2) # -
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n") # +
saysomething(2, callbackFunc) # +
print("\ntime: ", time.time() - t0, "\n") # +
print("main: message 4.")
print("\ntime: ", time.time() - t0)
输出:
main: message 1.
saysomething: message 2 delay 2
callback: message 3 delay 2
time: 2.00
saysomething: message 2 delay 2
callback: message 3 delay 2
time: 4.00
main: message 4.
time: 4.00
为什么示例不起作用
sleep()
暂停调用线程的执行,因此如果您想显示异步行为,则不能在主线程上调用它。
您可以使用线程或事件循环来显示异步行为。
一个使用事件循环的例子
下面是一个使用 built-in 事件循环的例子:
import asyncio # +
import time
def callbackFunc(delay):
print("callback: message 3 delay " + str(delay))
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
# time.sleep(2) # -
# callback(delay) # -
asyncio.get_event_loop().call_later(2, callback, delay) # +
def wait_for_callbacks(): # +
def run_until_complete(loop):
loop.call_soon(lambda: run_until_complete(loop) if loop._scheduled else loop.stop())
loop = asyncio.get_event_loop()
run_until_complete(loop)
loop.run_forever()
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
print("main: message 4.")
wait_for_callbacks() # +
print("\ntime: ", time.time() - t0)
输出:
main: message 1.
saysomething: message 2 delay 2
time: 0.000
saysomething: message 2 delay 2
time: 0.000
main: message 4.
callback: message 3 delay 2
callback: message 3 delay 2
time: 2.000
一个使用线程的例子
这是一个使用 built-in OS 线程的示例:
import threading # +
import time
def threadify(func): # +
def _func(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return thread
return _func
def callbackFunc(delay):
print("callback: message 3 delay " + str(delay))
@threadify # +
def saysomething(delay, callback):
print("saysomething: message 2 delay " + str(delay))
time.sleep(2)
callback(delay)
def wait_for_callbacks(): # +
for thread in threading.enumerate():
if thread is not threading.current_thread():
thread.join()
if __name__ == '__main__':
t0 = time.time()
print("main: message 1.")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
saysomething(2, callbackFunc)
print("\ntime: ", time.time() - t0, "\n")
print("main: message 4.")
wait_for_callbacks() # +
print("\ntime: ", time.time() - t0)
输出:(由于 OS 线程的开销,比使用 built-in 事件循环稍慢)
main: message 1.
saysomething: message 2 delay 2
time: 0.000
saysomething: message 2 delay 2
time: 0.000
main: message 4.
callback: message 3 delay 2
callback: message 3 delay 2
time: 2.00
关于回调是提供“灵活性”的一种方式,你是正确的,但它实际上与“速度”无关。 典型 回调的用例是函数 1 调用函数 2 的情况,函数 2 正在执行某个函数,该函数的完成是异步发生的,因此函数 2 或多或少会立即返回但函数1 仍然需要安排在异步完成发生时发生通知。因此,函数 1 向函数 2 传递一个回调函数,该函数将在完成时使用 agreed-upon 个参数调用。
在你的情况下,你的异步事件是一个时间间隔的到期,你的回调函数传递的是那个延迟时间间隔。现在事实证明 Python 带有一个 sched.scheduler
class 允许您通过指定绝对时间值或延迟,将添加到当前时间以计算事件发生的绝对时间 运行。此事件只是一个回调函数,您可以向其指定您想要的任何参数。在我看来,这个 class 的问题在于,您必须首先输入所有您想要 运行ning 的事件,然后调用一个 run
方法,该方法将阻塞直到所有事件都是 运行。更好的方法是只指定一个你想要 运行 的未来事件(即回调)并继续而不阻塞,这个事件将在另一个线程中异步 运行 。因此,我对 sched.scheduler
class 进行了大量修改以创建 Scheduler
class。您的代码将如下所示:
import time
from scheduler import Scheduler
def callbackFunc(msg_no, delay):
print(f"callback: message number {msg_no}, delay {delay} at {time.time()}")
def saysomething(msg_no, delay, callback):
print(f"saysomething: message {msg_no}, delay {delay} at {time.time()}")
scheduler.enter(delay, callbackFunc, args=(msg_no, delay,))
time.sleep(2)
if __name__ == '__main__':
scheduler = Scheduler()
saysomething(1, 1, callbackFunc)
saysomething(2, 2, callbackFunc)
saysomething(3, 3, callbackFunc)
打印:
saysomething: message 1, delay 1 at 1644584120.865646
callback: message number 1, delay 1 at 1644584121.8778687
saysomething: message 2, delay 2 at 1644584122.8747876
saysomething: message 3, delay 3 at 1644584124.8790839
callback: message number 2, delay 2 at 1644584124.8790839
callback: message number 3, delay 3 at 1644584127.9029477
和 Scheduler
class:
"""
Modified sched.schedule class.
"""
import time
import heapq
from collections import namedtuple
import threading
class Event(namedtuple('Event', 'start_time, priority, action, args, kwargs')):
__slots__ = []
def __eq__(s, o): return (s.start_time, s.priority) == (o.start_time, o.priority)
def __lt__(s, o): return (s.start_time, s.priority) < (o.start_time, o.priority)
def __le__(s, o): return (s.start_time, s.priority) <= (o.start_time, o.priority)
def __gt__(s, o): return (s.start_time, s.priority) > (o.start_time, o.priority)
def __ge__(s, o): return (s.start_time, s.priority) >= (o.start_time, o.priority)
Event.start_time.__doc__ = ('''Numeric type compatible with the return value from time.monotonic.''')
Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
in the order of their priority.''')
Event.action.__doc__ = ('''Executing the event means executing
action(*args, **kwargs)''')
Event.args.__doc__ = ('''args is a sequence holding the positional
arguments for the action.''')
Event.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
arguments for the action.''')
_sentinel = object()
class Scheduler:
def __init__(self, daemon=False):
"""
Initialize a new instance.
If daemon is True, the scheduler thread will run as a daemon so it will be possible
for the main thread to terminate with scheduled events yet to run.
Regardless of how the daemon argument is set, when a new event is added a new
scheduler thread will be started if the previous thread has terminated.
"""
self._queue = []
self._daemon=daemon
self._running = False
self._got_event = threading.Condition()
self._queue_exhausted = threading.Event()
self._queue_exhausted.set()
self._thread = None
def __del__(self):
if not self._daemon and self._thread:
self._thread.join()
def enterabs(self, start_time, action, args=(), kwargs=_sentinel, priority=1):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
if kwargs is _sentinel:
kwargs = {}
event = Event(start_time, priority, action, args, kwargs)
with self._got_event:
heapq.heappush(self._queue, event)
self._queue_exhausted.clear()
if not self._running:
if self._thread:
self._thread.join() # tidy up
self._running = True
self._thread = threading.Thread(target=self._run, daemon=self._daemon).start()
else:
self._got_event.notify()
return event # The ID
def enter(self, delay, action, args=(), kwargs=_sentinel, priority=1):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
start_time = time.monotonic() + delay
return self.enterabs(start_time, action, args, kwargs, priority)
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
If the event is not in the queue, this raises ValueError.
"""
with self._got_event:
self._queue.remove(event)
heapq.heapify(self._queue)
def empty(self):
"""Check whether the queue is empty."""
with self._got_event:
return not self._queue
def running(self):
"""Check whether the scheduler is running."""
with self._got_event:
return self._running
def _run(self):
"""Execute events until the queue is empty."""
# localize variable access to minimize overhead
# and to improve thread safety
got_event = self._got_event
q = self._queue
delayfunc = time.sleep
timefunc = time.monotonic
pop = heapq.heappop
queue_exhausted = self._queue_exhausted
while True:
try:
while True:
with got_event:
if not q:
self._running = False
queue_exhausted.set()
return
start_time, priority, action, args, kwargs = q[0]
now = timefunc()
if start_time > now:
# Wait for either the time to elapse or a new Event to be added:
got_event.wait(timeout=(start_time - now))
continue
pop(q)
action(*args, **kwargs)
delayfunc(0) # Let other threads run
except:
pass
@property
def queue(self):
"""An ordered list of upcoming events.
Events are named tuples with fields for:
start_time, priority, action, argss, kwargs
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
with self._got_event:
events = self._queue[:]
return list(map(heapq.heappop, [events]*len(events)))
def wait_for_queue_empty(self):
"""Wait for the queue to become empty."""
return self._queue_exhausted.wait()