python 上的线程 raspberry pi 3 - 速度优化

python Threads on raspberry pi 3 - speed optimization

我正在尝试从两个传感器获取数据,每 1 秒一个,每 10 秒一个。

我有两个函数可以使用来自传感器的值更新小型 OLED 显示器。我希望永久拥有这两个功能 运行 以始终显示最新值。完成研究后,我认为我已经找到了我需要的 Ray,但它似乎在 Pi 3 上不起作用。然后我查看了线程,我是这样实现的:

from threading import Thread

def update_temp():
    ## get the values of the thermometer paint new one to OLED every second

def update_speed():
    ## get the values from the GPS and paint every 10 seconds

if __name__ == '__main__':
    temp_thread = Thread(target = update_temp)
    speed_thread = Thread(target = update_speed)
    
    temp_thread.start()
    speed_thread.start()

现在,当我 运行 执行此操作时,这两个功能都可以很好地更新,但速度相当慢。我想用自定义字体绘制 OLED、读取传感器、与 GPS 通信等对它来说有点苛刻,但仍然:有没有一种方法可以按照我设置线程的方式加快速度? join() 有什么大惊小怪的?你可以看出我是新手!


编辑:这是完整的代码,包括函数中发生的事情。我已经移除了一些东西(LED,其他温度传感器)但现在留下了 运行s 的所有东西。谢谢!

from os import system
from threading import Thread
import glob
import serial
import subprocess
import urllib
import urllib.request
import urllib.parse
import array
import requests
from time import sleep
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import sh1106
from PIL import ImageFont, Image, ImageDraw
import time
import board
import busio
import adafruit_bmp280
import subprocess
import RPi.GPIO as GPIO

################## config display ##################
device = sh1106(i2c(port=1, address=0x3C), rotate=0)
device.clear()

### setup different fonts
FA_solid = ImageFont.truetype('/home/pi/Desktop/fonts/fa-solid-900.ttf', 16)
text_large = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 64)
text_medium = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 28)
text_small = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 12)
 
### Initialize drawing zone (aka entire screen)
output = Image.new("1", (128,64))
add_to_image = ImageDraw.Draw(output)

### coordinates always: padding-left, padding-top. the first pair of zone is always = start
# speed
speed_zone = [(0,0), (100,64)]
speed_start = (0,0)

# temp
temp_zone = [(100,48), (128,64)]
temp_start = (100,48)

# GPS status
icon_zone = [(108,0), (128,16)]
icon_start = (108,0)

# load icon
add_to_image.text(icon_start, "\uf252", font=FA_solid, fill="white")
device.display(output)

# usage
#add_to_image.rectangle(speed_zone, fill="black", outline = "black")
#add_to_image.text(speed_start, "\uf00c", font=FA_solid, fill="white")
#device.display(output)




################## config GPS and GPRS via FONA ##################
SECONDS_BTW_READS = 5
READINGS_PER_UPLOAD = 5
TARGET_URL = "https://some_url"




################## config external thermometer ##################
base_dir = '/sys/bus/w1/devices/'
device_folder = glob.glob(base_dir + '28*')[0]
device_file = device_folder + '/w1_slave'
 
def update_temp():
    while True:
        f = open(device_file, 'r')
        lines = f.readlines()
        f.close()
        equals_pos = lines[1].find('t=')
        if equals_pos != -1:
            temp_string = lines[1][equals_pos+2:]
            temp_c = round(float(temp_string) / 1000.0)
            add_to_image.rectangle(temp_zone, fill="black", outline = "black")
            add_to_image.text(temp_start, str(temp_c), font=text_medium, fill="white")
            device.display(output)
            time.sleep(30)



############################################
############################################
##########      Program start       ########
############################################
############################################



# Start PPPD
def openPPPD():
    print("Opening PPPD")
    # Check if PPPD is already running by looking at syslog output
    output1 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
    if b"secondary DNS address" not in output1 and b"locked" not in output1:
        while True:
            # Start the "fona" process
            subprocess.call("sudo pon fona", shell=True)
            sleep(2)
            output2 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
            if b"script failed" not in output2:
                break
    # Make sure the connection is working
    while True:
        output2 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
        output3 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -3", shell=True)
        if b"secondary DNS address" in output2 or b"secondary DNS address" in output3:
            return True
            print("PPPD opened successfully")

# Stop PPPD
def closePPPD():
    print("turning off cell connection")
    # Stop the "fona" process
    subprocess.call("sudo poff fona", shell=True)
    # Make sure connection was actually terminated
    while True:
        output = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
        if b"Exit" in output:
            return True

# Check for a GPS fix
def checkForFix():
    print ("checking for fix")
    add_to_image.rectangle(icon_zone, fill="black", outline = "black")
    add_to_image.text(icon_start, "\uf124", font=FA_solid, fill="white") #location icon
    device.display(output)
        
    # Start the serial connection
    ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
    # Turn on the GPS
    ser.write(b"AT+CGNSPWR=1\r")
    ser.write(b"AT+CGNSPWR?\r")
    while True:
        response = ser.readline()
        if b" 1" in response:
            break
    # Ask for the navigation info parsed from NMEA sentences
    ser.write(b"AT+CGNSINF\r")
    while True:
            response = ser.readline()
            # Check if a fix was found
            if b"+CGNSINF: 1,1," in response:
                print ("fix found")
                print (response)
                add_to_image.rectangle(icon_zone, fill="black", outline = "black")
                device.display(output)
                return True
            
            # If a fix wasn't found, wait and try again
            if b"+CGNSINF: 1,0," in response:
                sleep(5)
                ser.write(b"AT+CGNSINF\r")
                print ("still looking for fix")
                add_to_image.rectangle(icon_zone, fill="black", outline = "black")
                add_to_image.text(icon_start, "\uf00d", font=FA_solid, fill="white") #X
                device.display(output)
            else:
                ser.write(b"AT+CGNSINF\r")

# Read the GPS data for Latitude and Longitude
def getCoord():
    # Start the serial connection
    ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
    ser.write(b"AT+CGNSINF\r")
    while True:
        response = ser.readline()
        if b"+CGNSINF: 1," in response:
            # Split the reading by commas and return the parts referencing lat and long
            array = response.split(b",")
            lat = array[3]
            lon = array[4]
            time = array[2]
            speed = array[6]
            return (lat,lon,time,speed)

# Start the program by opening the cellular connection and creating a bucket for our data
def update_speed():
    if openPPPD():    
        GPS_DATA = {}
        while True:
            # Close the cellular connection
            if closePPPD():
                print ("closing connection")
                sleep(1)
            # The range is how many data points we'll collect before streaming
            for i in range(READINGS_PER_UPLOAD):
                # Make sure there's a GPS fix
                if checkForFix():
                    # Get lat and long
                    if getCoord():
                        latitude, longitude, time, speed = getCoord()
                        coord = str(latitude) + "," + str(longitude)
                        print ("Coordinates:", coord)
                        print ("Time:", time)
                        print ("Step", i+1, "out of",READINGS_PER_UPLOAD)
                        
                        add_to_image.rectangle(speed_zone, fill="black", outline = "black")
                        add_to_image.text(speed_start, str(round(float(speed))), font=text_large, fill="white")
                        device.display(output)
                        
                        GPS_DATA[i] = {'lat': latitude, 'long' : longitude, 'time' : time, 'speed' : speed}
                        sleep(SECONDS_BTW_READS)
                        
                # Turn the cellular connection on every READINGS_PER_UPLOAD reads
                if i == (READINGS_PER_UPLOAD-1):
                    print ("opening connection")
                    add_to_image.rectangle(icon_zone, fill="black", outline = "black")
                    add_to_image.text(icon_start, "\uf7c0", font=FA_solid, fill="white") #sat dish
                    device.display(output)

                    if openPPPD():
                        print ("streaming")                    
                        add_to_image.rectangle(icon_zone, fill="black", outline = "black")
                        add_to_image.text(icon_start, "\uf382", font=FA_solid, fill="white") #upload
                        device.display(output)
                        
                        url_values = urllib.parse.urlencode(GPS_DATA)
                        #print(url_values)

                        full_url = TARGET_URL + '?' + url_values
                        with urllib.request.urlopen(full_url) as response:
                            print(response)
                           
                        print ("streaming complete")
                        GPS_DATA = {}  
                        add_to_image.rectangle(icon_zone, fill="black", outline = "black")
                        add_to_image.text(icon_start, "\uf00c", font=FA_solid, fill="white") #check
                        device.display(output)


if __name__ == '__main__':
    temp_thread = Thread(target = update_temp)
    speed_thread = Thread(target = update_speed)
    
    temp_thread.start()
    speed_thread.start()
    
    speed_thread.join()

这里有很多需要改进的地方。我无法解决所有问题,只有一些建议:

首先,device.display()阻塞。不要在每次更改时都重新绘制,而是在必要时进行批量更新:

pending_redraw = False
def update_display():
    while True:
        # there is a potential race condition here, not critical
        if pending_redraw:
            pending_redraw = False
            device.display()
        time.sleep(0.1)

 # somewhere near the bottom:
 display_thread = Thread(target=update_display)
 display_thread.start()

温度线程 - 见内联评论:

# avoid magic constants, even as simple as 't='
def update_temp(temp_signature='t=', update_interval=30):
    # there is no need to open/close the file handler every time.
    # Moving open/close out of the loop:
    f = open(device_file, 'r')
    
    while True:
        # previously: lines[1] will fail if only one line was read 
        line = f.readline()
        # protip: instead of wrapping positive case in a huge IF,
        # return/continue early
        if temp_signature not in line:
            continue
        temp_string = line.split(temp_signature, 1)[-1]
        temp_c = round(float(temp_string) / 1000.0)
        add_to_image.rectangle(temp_zone, fill="black", outline = "black")
        add_to_image.text(temp_start, str(temp_c), font=text_medium, fill="white")
        pending_redraw = True

        # previously, if 't=' signature wasn't found, the thread 
        # immediately went to open/close the device handle
        # adding delay before that will save resources
        time.sleep(update_interval)

    lines = f.readlines()

openPPPDclosePPPD 有大量阻塞调用,但是 .. 这次将它们的优化留给您

使用 GPS:

# Move out of checkForFix - opening/closign the port is blocking and expensive
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
# Turn on the GPS
ser.write(b"AT+CGNSPWR=1\r")
ser.write(b"AT+CGNSPWR?\r")
# slightly optimized version. Note the added sleep
while b' 1' not in ser.readline(): time.sleep(0.1)

def checkForFix():
    # checkForFix() is called only called from update_speed(), 
    # which immediately redraws the screen after. Safe to skip redraw
    print ("checking for fix")
    add_to_image.rectangle(icon_zone, fill="black", outline = "black")
    add_to_image.text(icon_start, "\uf124", font=FA_solid, fill="white") #location icon

    while True:  # simplified the logic a little bit, saving few blocking writes
        ser.write(b"AT+CGNSINF\r")
        add_to_image.rectangle(icon_zone, fill="black", outline = "black")
        response = ser.readline()
        # Check if a fix was found
        if b"+CGNSINF: 1,1," in response:
            print ("fix found")
            print (response)
            pending_redraw = True
            return True
        
        # If a fix wasn't found, wait and try again
        if b"+CGNSINF: 1,0," in response:
            print("still looking for fix")
            add_to_image.rectangle(icon_zone, fill="black", outline = "black")
            add_to_image.text(icon_start, "\uf00d", font=FA_solid, fill="white") #X

线程速度:

# Start the program by opening the cellular connection and creating a bucket for our data
def update_speed():
    if not openPPPD():
        return  # again, return early 
    GPS_DATA = {}
    while True:
        # Close the cellular connection
        if closePPPD():  # 
            print ("closing connection")
            sleep(1)
        # The range is how many data points we'll collect before streaming
        for i in range(READINGS_PER_UPLOAD):
            # Make sure there's a GPS fix
            # two chained IFs - just use `and`
            if not (checkForFix() and getCoord()):
                continue
            # Get lat and long
            latitude, longitude, time, speed = getCoord()
            coord = str(latitude) + "," + str(longitude)
            print ("Coordinates:", coord)
            print ("Time:", time)
            print ("Step", i+1, "out of",READINGS_PER_UPLOAD)
                
            add_to_image.rectangle(speed_zone, fill="black", outline="black")
            add_to_image.text(speed_start, str(round(float(speed))), font=text_large, fill="white")
            pending_redraw = True
                    
            GPS_DATA[i] = {'lat': latitude, 'long' : longitude, 'time' : time, 'speed' : speed}
            sleep(SECONDS_BTW_READS)

        # Instead of checking for last iteration, just do it AFTER the loop 
        # Turn the cellular connection on every READINGS_PER_UPLOAD reads
        print ("opening connection")
        add_to_image.rectangle(icon_zone, fill="black", outline = "black")
        add_to_image.text(icon_start, "\uf7c0", font=FA_solid, fill="white") #sat dish
        pending_redraw = True

        if not openPPPD():
            continue  # return/continue early

        print ("streaming")                    
        add_to_image.rectangle(icon_zone, fill="black", outline = "black")
        add_to_image.text(icon_start, "\uf382", font=FA_solid, fill="white") #upload
        pending_redraw = True
                    
        url_values = urllib.parse.urlencode(GPS_DATA)
        #print(url_values)

        full_url = TARGET_URL + '?' + url_values
        with urllib.request.urlopen(full_url) as response:
            print(response)
                       
        print ("streaming complete")
        GPS_DATA = {}  
        add_to_image.rectangle(icon_zone, fill="black", outline = "black")
        add_to_image.text(icon_start, "\uf00c", font=FA_solid, fill="white") #check
        pending_redraw = True

主要优化:

  • 节省一些显示重绘
  • update_temp
  • 中跳过不必要的设备 open/close
  • 避免在 checkForFix
  • 中不必要地重新打开序列号

update_speed 中的变化主要是装饰性的

作为一般规则,最好 post 这样的东西 https://codereview.stackexchange.com/ 而不是 Whosebug