如何提前时钟并完成所有事件

How to advance clock and going through all the events

出于测试目的阅读此 answer(第 2 点)与 Twistedtask.Clock 相关的问题,我发现很奇怪没有办法提前时钟从 t0t1,同时捕获 t0t1.

中的所有 callLater 调用

当然,您可以通过以下方式解决此问题:

clock = task.Clock()
reactor.callLater = clock.callLater

...

def advance_clock(total_elapsed, step=0.01):
    elapsed = 0
    while elapsed < total_elapsed:
        clock.advance(step)
        elapsed += step

...

time_to_advance = 10  # seconds
advance_clock(time_to_advance)

但随后我们将问题转向选择足够小的 step,这对于 callLater 调用来说可能非常棘手,例如从概率分布中抽样时间。

有人能想出解决这个问题的方法吗?

考虑到 Twisted 的典型用途-class 是混合硬件事件和计时器,我很困惑您为什么要这样做,但是...

我的理解是,Twisted 内部正在通过 reactor 对象内部的多个列表跟踪 callLater 事件(参见:http://twistedmatrix.com/trac/browser/tags/releases/twisted-15.2.0/twisted/internet/base.py#L437 - class ReactorBase 内部的 xxxTimedCalls 列表)

我还没有做任何工作来弄清楚这些列表是否暴露在任何地方,但如果你想把反应堆的生命掌握在自己手中,我相信你可以闯入。

通过访问时序列表,您可以简单地将时间转发到列表的下一个元素时...但是如果您尝试测试与 IO 事件交互的代码,我无法想象这会做任何事情,但混淆你...

祝你好运

I found very weird that there is no way to advance the clock from t0 to t1 while catching all the callLater calls within t0 and t1.

根据您稍后在问题中所写的内容,我假设您指出的情况是以下示例程序所演示的情况:

from twisted.internet.task import Clock

def foo(reactor, n):
    if n == 0:
        print "Done!"
    reactor.callLater(1, foo, reactor, n - 1)

reactor = Clock()
foo(reactor, 10)
reactor.advance(10)

人们可能希望这个程序打印 Done! 但它没有。如果最后一行替换为:

for i in range(10):
    reactor.advance(1)

然后生成的程序打印Done!.

Clock 以这种方式工作的原因是它正是 真实 时钟的工作方式。据我所知,目前还没有使用 continuous 时间系统的计算机时钟。我不会说不可能在具有离散步骤的时钟之上实现定时事件系统,这样它似乎可以提供连续的时间流 - 但我会说 Twisted 没有尝试这样做。

Clock 和真正的 reactor 实现之间唯一真正的区别是 Clock 可以 使时间步长比你大得多可能会遇到 典型 真实反应堆的使用情况。

然而,对于一个真正的反应堆来说,很可能会出现这样一种情况,即大量的时间都在一个离散的步骤中通过。这可能是因为系统时钟发生了变化(有一些讨论可以独立于系统时钟来安排事件,这样这种情况就会消失)或者可能是因为某些应用程序代码阻塞了反应器一段时间(实际上,应用程序代码总是 阻塞反应堆!但在典型的程序中,它只会阻塞反应堆的时间很短,大多数人可以忽略)。

Clock 一种模仿这些大步骤的方法,可以编写测试程序在出现其中一种情况时执行的操作。例如,也许您真的很在意,当内核由于 Linux I/O 电梯算法中的一个奇怪的怪癖而决定不将您的程序安排 2.1 秒时,您的物理引擎仍然会计算 2.1 秒的物理即使您的 200Hz 模拟循环的 420 次调用已被跳过。

可以公平地说,Twisted 提供的默认(标准?仅?)基于时间的测试工具应该对常见情况更加友好......或者不是。也许这会鼓励人们编写仅在常见情况下有效的程序,并在出现不常见(但最终不可避免)情况时中断现实世界。我不确定。

关于迈克关于准确推进到下一个预定通话的建议,您可以轻松做到这一点,而无需破解任何内部结构。 clock.advance(clock.getDelayedCalls()[0].getTime() - clock.seconds()) 将完全做到这一点(也许你会争辩说 Clock 会更好,如果它至少为此提供一个明显的辅助函数来简化对常见情况的测试)。请记住,真正的时钟 不会 像这样前进,所以如果你的代码在你的单元测试中有某种理想的行为,当你使用这个技巧时,不要误以为这意味着相同理想的行为将存在于实际使用中。

这是一个函数,它将通过迭代 reactor.getDelayedCalls 将反应器推进到下一个 IDelayedCall。这存在 Mike 提到的未捕获 IO 事件的问题,因此您可以指定它应该等待的最小和最大时间,以及最大时间步长。

def advance_through_delayeds(reactor, min_t=None, max_t=None, max_step=None):
    elapsed = 0
    while True:
        if max_t is not None and elapsed >= max_t:
            break
        try:
            step = min(d.getTime() - reactor.seconds() for d in reactor.getDelayedCalls())
        except ValueError:
            # nothing else pending
            if min_t is not None and elapsed < min_t:
                step = min_t - elapsed
            else:
                break
        if max_step is not None:
            step = min(step, max_step)
        if max_t is not None:
            step = min(step, max_t-elapsed)
        reactor.advance(step)
        elapsed += step
    return elapsed

如果您需要等待某些 I/O 完成,请将 min_tmax_step 设置为合理的值。

# wait at least 10s, advancing the reactor by no more than 0.1s at a time
advance_through_delayeds(reactor, min_t=10, max_step=0.1)

如果设置了min_t,则getDelayedCallsreturns到达该时间后将退出一次空列表。

始终将 max_t 设置为合理的值以防止测试套件挂起可能是个好主意。例如,在 JPC 的上述 foo 函数中,它确实到达了 print "Done!" 语句,但随后将永远挂起,因为回调链永远不会完成。