在 python 2 中每 n 毫秒周期性地在 python 中执行一个函数
Execute a function periodically in python for every n milliseconds in python 2
我想知道如何每 10 毫秒在 Python 中执行一个函数。我尝试使用线程(使用 threading.Timer(0.001, fun)
),但 threading.Timer
最多只能正确执行几秒钟。有什么可以在 ms(毫秒)内执行 task/function 吗?
代码如下
def task():
print "called every 10ms"
print time.ctime()
threading.Timer(0.01,task).start()
task()
我自己试了一下,确实有效,但 Python 在控制台中似乎没那么快。在我的例子中,它只达到每秒大约 60 次运行。
import threading
import time
def hello(*args):
print(str(args[0])+" It's "+str(time.ctime()))
next=int(args[0])+1
threading.Timer(0.001, hello,[str(next)]).start()
hello("1")
我也有类似的问题,我需要在 1000Hz 下进行一些采样,并且在长 运行 上不能有任何明显的漂移。尝试休眠 0.001 秒总是会导致较低的采样率(漂移),即使我试图估计采样代码执行的时间(在休眠前后使用 time.time())。
事实证明,一个简单的解决方案是跟踪执行次数并用它来计算您是否迟到。
import time
import threading
class PeriodicSleeper(threading.Thread):
def __init__(self, task_function, period):
super().__init__()
self.task_function = task_function
self.period = period
self.i = 0
self.t0 = time.time()
self.start()
def sleep(self):
self.i += 1
delta = self.t0 + self.period * self.i - time.time()
if delta > 0:
time.sleep(delta)
def run(self):
while True:
self.task_function()
self.sleep()
并进行测试:
def task():
t = time.time()
if abs(t - round(t)) <= 0.0005:
print(t, 'Mean Frequency:', sleeper.i / (t - sleeper.t0))
sleeper = PeriodicSleeper(task, 0.001)
导致小于半个周期的偏差和收敛到所需值的平均频率。
1623373581.0000355 Mean Frequency: 999.8594124106243
1623373582.0000308 Mean Frequency: 999.9576212826602
1623373583.0000248 Mean Frequency: 999.9771123402633
1623373584.0000222 Mean Frequency: 999.9844427446347
1623373585.0000298 Mean Frequency: 999.9862123585227
1623373586.000023 Mean Frequency: 999.989990000442
....
1623373863.0000231 Mean Frequency: 999.9998049640523
1623373864.000024 Mean Frequency: 999.9998022878916
1623373865.0000224 Mean Frequency: 999.9998088494829
1623373866.0000248 Mean Frequency: 999.9998011675633
我想知道如何每 10 毫秒在 Python 中执行一个函数。我尝试使用线程(使用 threading.Timer(0.001, fun)
),但 threading.Timer
最多只能正确执行几秒钟。有什么可以在 ms(毫秒)内执行 task/function 吗?
代码如下
def task():
print "called every 10ms"
print time.ctime()
threading.Timer(0.01,task).start()
task()
我自己试了一下,确实有效,但 Python 在控制台中似乎没那么快。在我的例子中,它只达到每秒大约 60 次运行。
import threading
import time
def hello(*args):
print(str(args[0])+" It's "+str(time.ctime()))
next=int(args[0])+1
threading.Timer(0.001, hello,[str(next)]).start()
hello("1")
我也有类似的问题,我需要在 1000Hz 下进行一些采样,并且在长 运行 上不能有任何明显的漂移。尝试休眠 0.001 秒总是会导致较低的采样率(漂移),即使我试图估计采样代码执行的时间(在休眠前后使用 time.time())。
事实证明,一个简单的解决方案是跟踪执行次数并用它来计算您是否迟到。
import time
import threading
class PeriodicSleeper(threading.Thread):
def __init__(self, task_function, period):
super().__init__()
self.task_function = task_function
self.period = period
self.i = 0
self.t0 = time.time()
self.start()
def sleep(self):
self.i += 1
delta = self.t0 + self.period * self.i - time.time()
if delta > 0:
time.sleep(delta)
def run(self):
while True:
self.task_function()
self.sleep()
并进行测试:
def task():
t = time.time()
if abs(t - round(t)) <= 0.0005:
print(t, 'Mean Frequency:', sleeper.i / (t - sleeper.t0))
sleeper = PeriodicSleeper(task, 0.001)
导致小于半个周期的偏差和收敛到所需值的平均频率。
1623373581.0000355 Mean Frequency: 999.8594124106243
1623373582.0000308 Mean Frequency: 999.9576212826602
1623373583.0000248 Mean Frequency: 999.9771123402633
1623373584.0000222 Mean Frequency: 999.9844427446347
1623373585.0000298 Mean Frequency: 999.9862123585227
1623373586.000023 Mean Frequency: 999.989990000442
....
1623373863.0000231 Mean Frequency: 999.9998049640523
1623373864.000024 Mean Frequency: 999.9998022878916
1623373865.0000224 Mean Frequency: 999.9998088494829
1623373866.0000248 Mean Frequency: 999.9998011675633