Python, Raspberry pi, 精确每10毫秒调用一个任务

Python, Raspberry pi, call a task every 10 milliseconds precisely

我目前正在尝试每 10 毫秒调用一次函数以从传感器获取数据。

基本上我是从 gpio 中断触发回调,但我更换了传感器,而我目前使用的传感器没有 INT 引脚来驱动回调。

所以我的目标是具有相同的行为,但具有由计时器生成的内部中断。

我试过这个topic

import threading

def work (): 
  threading.Timer(0.25, work).start ()
  print(time.time())
  print "Whosebug"

work ()

但是当我 运行 它时,我可以看到计时器并不是很精确,而且它会随着时间的推移而发生偏差。

1494418413.1584847
Whosebug
1494418413.1686869
Whosebug
1494418413.1788757
Whosebug
1494418413.1890721
Whosebug
1494418413.1992736
Whosebug
1494418413.2094712
Whosebug
1494418413.2196639
Whosebug
1494418413.2298684
Whosebug
1494418413.2400634
Whosebug
1494418413.2502584
Whosebug
1494418413.2604961
Whosebug
1494418413.270702
Whosebug
1494418413.2808678
Whosebug
1494418413.2910736
Whosebug
1494418413.301277
Whosebug

所以计时器每 10 毫秒偏差 0.2 毫秒,几秒后这是一个相当大的偏差。

我知道 python 并不是真正为“实时”制作的,但我认为应该有办法做到这一点。

如果有人已经不得不处理 python 的时间限制,我很乐意提供一些建议。

谢谢。

我尝试了您的解决方案,但得到了奇怪的结果。

这是我的代码:

from multiprocessing import Queue
import threading
import time

def work(queue, target):
    t0 = time.clock()
    print("Target\t" + str(target))
    print("Current time\t" + str(t0))
    print("Delay\t" + str(target - t0))
    print()
    threading.Timer(target - t0, work, [queue, target+0.01]).start()

myQueue = Queue()

target = time.clock() + 0.01
work(myQueue, target)

这是输出

Target  0.054099
Current time    0.044101
Delay   0.009998

Target  0.064099
Current time    0.045622
Delay   0.018477

Target  0.074099
Current time    0.046161
Delay   0.027937999999999998

Target  0.084099
Current time    0.0465
Delay   0.037598999999999994

Target  0.09409899999999999
Current time    0.046877
Delay   0.047221999999999986

Target  0.10409899999999998
Current time    0.047211
Delay   0.05688799999999998

Target  0.11409899999999998
Current time    0.047606
Delay   0.06649299999999997

因此我们可以看到目标每 10 毫秒增加一次,对于第一个循环,计时器的延迟似乎不错。

重点不是从 current_time 开始 + 延迟而是从 0.045622 开始,这表示延迟是 0.001521 而不是 0.01000

我错过了什么吗?我的代码似乎符合您的逻辑,不是吗?


@Chupo_cro

的工作示例

这是我的工作示例

from multiprocessing import Queue
import RPi.GPIO as GPIO
import threading
import time
import os

INTERVAL = 0.01
ledState = True

GPIO.setmode(GPIO.BCM)
GPIO.setup(2, GPIO.OUT, initial=GPIO.LOW)

def work(queue, target):
    try:
        threading.Timer(target-time.time(), work, [queue, target+INTERVAL]).start()
        GPIO.output(2, ledState)
        global ledState
        ledState = not ledState
    except KeyboardInterrupt:
        GPIO.cleanup()

try:
    myQueue = Queue()

    target = time.time() + INTERVAL
    work(myQueue, target)
except KeyboardInterrupt:
    GPIO.cleanup()

此代码适用于我的笔记本电脑 - 记录目标时间和实际时间之间的差异 - 主要是尽量减少 work() 函数中所做的工作,因为例如打印和滚动屏幕可能需要很长时间。

关键是根据调用时间和目标时间之间的差异启动下一个计时器。

我将间隔减慢到 0.1 秒,这样更容易看到在我的 Win7 x64 上超过 10 毫秒的抖动,这会导致将负值传递给 Timer() 调用时出现问题:-o

这会记录 100 个样本,然后打印它们 - 如果您重定向到 .csv 文件,您可以加载到 Excel 以显示图表。

from multiprocessing import Queue
import threading
import time

# this accumulates record of the difference between the target and actual times
actualdeltas = []

INTERVAL = 0.1

def work(queue, target):
    # first thing to do is record the jitter - the difference between target and actual time
    actualdeltas.append(time.clock()-target+INTERVAL)
#    t0 = time.clock()
#    print("Current time\t" + str(time.clock()))
#    print("Target\t" + str(target))
#    print("Delay\t" + str(target - time.clock()))
#    print()
#    t0 = time.clock()
    if len(actualdeltas) > 100:
        # print the accumulated deltas then exit
        for d in actualdeltas:
            print d
        return
    threading.Timer(target - time.clock(), work, [queue, target+INTERVAL]).start()

myQueue = Queue()

target = time.clock() + INTERVAL
work(myQueue, target)

典型输出(即不依赖 Python 中 Windows 的毫秒计时):

0.00947008617187
0.0029628920052
0.0121824719378
0.00582923077099
0.00131316206917
0.0105631524709
0.00437298744466
-0.000251418553351
0.00897956530515
0.0028528821332
0.0118192949105
0.00546301269675
0.0145723546788
0.00910063698529