python 上的线程 raspberry pi 3 - 速度优化
python Threads on raspberry pi 3 - speed optimization
我正在尝试从两个传感器获取数据,每 1 秒一个,每 10 秒一个。
我有两个函数可以使用来自传感器的值更新小型 OLED 显示器。我希望永久拥有这两个功能 运行 以始终显示最新值。完成研究后,我认为我已经找到了我需要的 Ray,但它似乎在 Pi 3 上不起作用。然后我查看了线程,我是这样实现的:
from threading import Thread
def update_temp():
## get the values of the thermometer paint new one to OLED every second
def update_speed():
## get the values from the GPS and paint every 10 seconds
if __name__ == '__main__':
temp_thread = Thread(target = update_temp)
speed_thread = Thread(target = update_speed)
temp_thread.start()
speed_thread.start()
现在,当我 运行 执行此操作时,这两个功能都可以很好地更新,但速度相当慢。我想用自定义字体绘制 OLED、读取传感器、与 GPS 通信等对它来说有点苛刻,但仍然:有没有一种方法可以按照我设置线程的方式加快速度? join() 有什么大惊小怪的?你可以看出我是新手!
编辑:这是完整的代码,包括函数中发生的事情。我已经移除了一些东西(LED,其他温度传感器)但现在留下了 运行s 的所有东西。谢谢!
from os import system
from threading import Thread
import glob
import serial
import subprocess
import urllib
import urllib.request
import urllib.parse
import array
import requests
from time import sleep
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import sh1106
from PIL import ImageFont, Image, ImageDraw
import time
import board
import busio
import adafruit_bmp280
import subprocess
import RPi.GPIO as GPIO
################## config display ##################
device = sh1106(i2c(port=1, address=0x3C), rotate=0)
device.clear()
### setup different fonts
FA_solid = ImageFont.truetype('/home/pi/Desktop/fonts/fa-solid-900.ttf', 16)
text_large = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 64)
text_medium = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 28)
text_small = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 12)
### Initialize drawing zone (aka entire screen)
output = Image.new("1", (128,64))
add_to_image = ImageDraw.Draw(output)
### coordinates always: padding-left, padding-top. the first pair of zone is always = start
# speed
speed_zone = [(0,0), (100,64)]
speed_start = (0,0)
# temp
temp_zone = [(100,48), (128,64)]
temp_start = (100,48)
# GPS status
icon_zone = [(108,0), (128,16)]
icon_start = (108,0)
# load icon
add_to_image.text(icon_start, "\uf252", font=FA_solid, fill="white")
device.display(output)
# usage
#add_to_image.rectangle(speed_zone, fill="black", outline = "black")
#add_to_image.text(speed_start, "\uf00c", font=FA_solid, fill="white")
#device.display(output)
################## config GPS and GPRS via FONA ##################
SECONDS_BTW_READS = 5
READINGS_PER_UPLOAD = 5
TARGET_URL = "https://some_url"
################## config external thermometer ##################
base_dir = '/sys/bus/w1/devices/'
device_folder = glob.glob(base_dir + '28*')[0]
device_file = device_folder + '/w1_slave'
def update_temp():
while True:
f = open(device_file, 'r')
lines = f.readlines()
f.close()
equals_pos = lines[1].find('t=')
if equals_pos != -1:
temp_string = lines[1][equals_pos+2:]
temp_c = round(float(temp_string) / 1000.0)
add_to_image.rectangle(temp_zone, fill="black", outline = "black")
add_to_image.text(temp_start, str(temp_c), font=text_medium, fill="white")
device.display(output)
time.sleep(30)
############################################
############################################
########## Program start ########
############################################
############################################
# Start PPPD
def openPPPD():
print("Opening PPPD")
# Check if PPPD is already running by looking at syslog output
output1 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
if b"secondary DNS address" not in output1 and b"locked" not in output1:
while True:
# Start the "fona" process
subprocess.call("sudo pon fona", shell=True)
sleep(2)
output2 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
if b"script failed" not in output2:
break
# Make sure the connection is working
while True:
output2 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
output3 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -3", shell=True)
if b"secondary DNS address" in output2 or b"secondary DNS address" in output3:
return True
print("PPPD opened successfully")
# Stop PPPD
def closePPPD():
print("turning off cell connection")
# Stop the "fona" process
subprocess.call("sudo poff fona", shell=True)
# Make sure connection was actually terminated
while True:
output = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
if b"Exit" in output:
return True
# Check for a GPS fix
def checkForFix():
print ("checking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf124", font=FA_solid, fill="white") #location icon
device.display(output)
# Start the serial connection
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
# Turn on the GPS
ser.write(b"AT+CGNSPWR=1\r")
ser.write(b"AT+CGNSPWR?\r")
while True:
response = ser.readline()
if b" 1" in response:
break
# Ask for the navigation info parsed from NMEA sentences
ser.write(b"AT+CGNSINF\r")
while True:
response = ser.readline()
# Check if a fix was found
if b"+CGNSINF: 1,1," in response:
print ("fix found")
print (response)
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
device.display(output)
return True
# If a fix wasn't found, wait and try again
if b"+CGNSINF: 1,0," in response:
sleep(5)
ser.write(b"AT+CGNSINF\r")
print ("still looking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00d", font=FA_solid, fill="white") #X
device.display(output)
else:
ser.write(b"AT+CGNSINF\r")
# Read the GPS data for Latitude and Longitude
def getCoord():
# Start the serial connection
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
ser.write(b"AT+CGNSINF\r")
while True:
response = ser.readline()
if b"+CGNSINF: 1," in response:
# Split the reading by commas and return the parts referencing lat and long
array = response.split(b",")
lat = array[3]
lon = array[4]
time = array[2]
speed = array[6]
return (lat,lon,time,speed)
# Start the program by opening the cellular connection and creating a bucket for our data
def update_speed():
if openPPPD():
GPS_DATA = {}
while True:
# Close the cellular connection
if closePPPD():
print ("closing connection")
sleep(1)
# The range is how many data points we'll collect before streaming
for i in range(READINGS_PER_UPLOAD):
# Make sure there's a GPS fix
if checkForFix():
# Get lat and long
if getCoord():
latitude, longitude, time, speed = getCoord()
coord = str(latitude) + "," + str(longitude)
print ("Coordinates:", coord)
print ("Time:", time)
print ("Step", i+1, "out of",READINGS_PER_UPLOAD)
add_to_image.rectangle(speed_zone, fill="black", outline = "black")
add_to_image.text(speed_start, str(round(float(speed))), font=text_large, fill="white")
device.display(output)
GPS_DATA[i] = {'lat': latitude, 'long' : longitude, 'time' : time, 'speed' : speed}
sleep(SECONDS_BTW_READS)
# Turn the cellular connection on every READINGS_PER_UPLOAD reads
if i == (READINGS_PER_UPLOAD-1):
print ("opening connection")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf7c0", font=FA_solid, fill="white") #sat dish
device.display(output)
if openPPPD():
print ("streaming")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf382", font=FA_solid, fill="white") #upload
device.display(output)
url_values = urllib.parse.urlencode(GPS_DATA)
#print(url_values)
full_url = TARGET_URL + '?' + url_values
with urllib.request.urlopen(full_url) as response:
print(response)
print ("streaming complete")
GPS_DATA = {}
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00c", font=FA_solid, fill="white") #check
device.display(output)
if __name__ == '__main__':
temp_thread = Thread(target = update_temp)
speed_thread = Thread(target = update_speed)
temp_thread.start()
speed_thread.start()
speed_thread.join()
这里有很多需要改进的地方。我无法解决所有问题,只有一些建议:
首先,device.display()
阻塞。不要在每次更改时都重新绘制,而是在必要时进行批量更新:
pending_redraw = False
def update_display():
while True:
# there is a potential race condition here, not critical
if pending_redraw:
pending_redraw = False
device.display()
time.sleep(0.1)
# somewhere near the bottom:
display_thread = Thread(target=update_display)
display_thread.start()
温度线程 - 见内联评论:
# avoid magic constants, even as simple as 't='
def update_temp(temp_signature='t=', update_interval=30):
# there is no need to open/close the file handler every time.
# Moving open/close out of the loop:
f = open(device_file, 'r')
while True:
# previously: lines[1] will fail if only one line was read
line = f.readline()
# protip: instead of wrapping positive case in a huge IF,
# return/continue early
if temp_signature not in line:
continue
temp_string = line.split(temp_signature, 1)[-1]
temp_c = round(float(temp_string) / 1000.0)
add_to_image.rectangle(temp_zone, fill="black", outline = "black")
add_to_image.text(temp_start, str(temp_c), font=text_medium, fill="white")
pending_redraw = True
# previously, if 't=' signature wasn't found, the thread
# immediately went to open/close the device handle
# adding delay before that will save resources
time.sleep(update_interval)
lines = f.readlines()
openPPPD
和 closePPPD
有大量阻塞调用,但是 .. 这次将它们的优化留给您
使用 GPS:
# Move out of checkForFix - opening/closign the port is blocking and expensive
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
# Turn on the GPS
ser.write(b"AT+CGNSPWR=1\r")
ser.write(b"AT+CGNSPWR?\r")
# slightly optimized version. Note the added sleep
while b' 1' not in ser.readline(): time.sleep(0.1)
def checkForFix():
# checkForFix() is called only called from update_speed(),
# which immediately redraws the screen after. Safe to skip redraw
print ("checking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf124", font=FA_solid, fill="white") #location icon
while True: # simplified the logic a little bit, saving few blocking writes
ser.write(b"AT+CGNSINF\r")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
response = ser.readline()
# Check if a fix was found
if b"+CGNSINF: 1,1," in response:
print ("fix found")
print (response)
pending_redraw = True
return True
# If a fix wasn't found, wait and try again
if b"+CGNSINF: 1,0," in response:
print("still looking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00d", font=FA_solid, fill="white") #X
线程速度:
# Start the program by opening the cellular connection and creating a bucket for our data
def update_speed():
if not openPPPD():
return # again, return early
GPS_DATA = {}
while True:
# Close the cellular connection
if closePPPD(): #
print ("closing connection")
sleep(1)
# The range is how many data points we'll collect before streaming
for i in range(READINGS_PER_UPLOAD):
# Make sure there's a GPS fix
# two chained IFs - just use `and`
if not (checkForFix() and getCoord()):
continue
# Get lat and long
latitude, longitude, time, speed = getCoord()
coord = str(latitude) + "," + str(longitude)
print ("Coordinates:", coord)
print ("Time:", time)
print ("Step", i+1, "out of",READINGS_PER_UPLOAD)
add_to_image.rectangle(speed_zone, fill="black", outline="black")
add_to_image.text(speed_start, str(round(float(speed))), font=text_large, fill="white")
pending_redraw = True
GPS_DATA[i] = {'lat': latitude, 'long' : longitude, 'time' : time, 'speed' : speed}
sleep(SECONDS_BTW_READS)
# Instead of checking for last iteration, just do it AFTER the loop
# Turn the cellular connection on every READINGS_PER_UPLOAD reads
print ("opening connection")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf7c0", font=FA_solid, fill="white") #sat dish
pending_redraw = True
if not openPPPD():
continue # return/continue early
print ("streaming")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf382", font=FA_solid, fill="white") #upload
pending_redraw = True
url_values = urllib.parse.urlencode(GPS_DATA)
#print(url_values)
full_url = TARGET_URL + '?' + url_values
with urllib.request.urlopen(full_url) as response:
print(response)
print ("streaming complete")
GPS_DATA = {}
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00c", font=FA_solid, fill="white") #check
pending_redraw = True
主要优化:
- 节省一些显示重绘
- 在
update_temp
中跳过不必要的设备 open/close
- 避免在
checkForFix
中不必要地重新打开序列号
update_speed
中的变化主要是装饰性的
作为一般规则,最好 post 这样的东西 https://codereview.stackexchange.com/ 而不是 Whosebug
我正在尝试从两个传感器获取数据,每 1 秒一个,每 10 秒一个。
我有两个函数可以使用来自传感器的值更新小型 OLED 显示器。我希望永久拥有这两个功能 运行 以始终显示最新值。完成研究后,我认为我已经找到了我需要的 Ray,但它似乎在 Pi 3 上不起作用。然后我查看了线程,我是这样实现的:
from threading import Thread
def update_temp():
## get the values of the thermometer paint new one to OLED every second
def update_speed():
## get the values from the GPS and paint every 10 seconds
if __name__ == '__main__':
temp_thread = Thread(target = update_temp)
speed_thread = Thread(target = update_speed)
temp_thread.start()
speed_thread.start()
现在,当我 运行 执行此操作时,这两个功能都可以很好地更新,但速度相当慢。我想用自定义字体绘制 OLED、读取传感器、与 GPS 通信等对它来说有点苛刻,但仍然:有没有一种方法可以按照我设置线程的方式加快速度? join() 有什么大惊小怪的?你可以看出我是新手!
编辑:这是完整的代码,包括函数中发生的事情。我已经移除了一些东西(LED,其他温度传感器)但现在留下了 运行s 的所有东西。谢谢!
from os import system
from threading import Thread
import glob
import serial
import subprocess
import urllib
import urllib.request
import urllib.parse
import array
import requests
from time import sleep
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import sh1106
from PIL import ImageFont, Image, ImageDraw
import time
import board
import busio
import adafruit_bmp280
import subprocess
import RPi.GPIO as GPIO
################## config display ##################
device = sh1106(i2c(port=1, address=0x3C), rotate=0)
device.clear()
### setup different fonts
FA_solid = ImageFont.truetype('/home/pi/Desktop/fonts/fa-solid-900.ttf', 16)
text_large = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 64)
text_medium = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 28)
text_small = ImageFont.truetype('/home/pi/Desktop/fonts/digital-7.ttf', 12)
### Initialize drawing zone (aka entire screen)
output = Image.new("1", (128,64))
add_to_image = ImageDraw.Draw(output)
### coordinates always: padding-left, padding-top. the first pair of zone is always = start
# speed
speed_zone = [(0,0), (100,64)]
speed_start = (0,0)
# temp
temp_zone = [(100,48), (128,64)]
temp_start = (100,48)
# GPS status
icon_zone = [(108,0), (128,16)]
icon_start = (108,0)
# load icon
add_to_image.text(icon_start, "\uf252", font=FA_solid, fill="white")
device.display(output)
# usage
#add_to_image.rectangle(speed_zone, fill="black", outline = "black")
#add_to_image.text(speed_start, "\uf00c", font=FA_solid, fill="white")
#device.display(output)
################## config GPS and GPRS via FONA ##################
SECONDS_BTW_READS = 5
READINGS_PER_UPLOAD = 5
TARGET_URL = "https://some_url"
################## config external thermometer ##################
base_dir = '/sys/bus/w1/devices/'
device_folder = glob.glob(base_dir + '28*')[0]
device_file = device_folder + '/w1_slave'
def update_temp():
while True:
f = open(device_file, 'r')
lines = f.readlines()
f.close()
equals_pos = lines[1].find('t=')
if equals_pos != -1:
temp_string = lines[1][equals_pos+2:]
temp_c = round(float(temp_string) / 1000.0)
add_to_image.rectangle(temp_zone, fill="black", outline = "black")
add_to_image.text(temp_start, str(temp_c), font=text_medium, fill="white")
device.display(output)
time.sleep(30)
############################################
############################################
########## Program start ########
############################################
############################################
# Start PPPD
def openPPPD():
print("Opening PPPD")
# Check if PPPD is already running by looking at syslog output
output1 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
if b"secondary DNS address" not in output1 and b"locked" not in output1:
while True:
# Start the "fona" process
subprocess.call("sudo pon fona", shell=True)
sleep(2)
output2 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
if b"script failed" not in output2:
break
# Make sure the connection is working
while True:
output2 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
output3 = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -3", shell=True)
if b"secondary DNS address" in output2 or b"secondary DNS address" in output3:
return True
print("PPPD opened successfully")
# Stop PPPD
def closePPPD():
print("turning off cell connection")
# Stop the "fona" process
subprocess.call("sudo poff fona", shell=True)
# Make sure connection was actually terminated
while True:
output = subprocess.check_output("cat /var/log/syslog | grep pppd | tail -1", shell=True)
if b"Exit" in output:
return True
# Check for a GPS fix
def checkForFix():
print ("checking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf124", font=FA_solid, fill="white") #location icon
device.display(output)
# Start the serial connection
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
# Turn on the GPS
ser.write(b"AT+CGNSPWR=1\r")
ser.write(b"AT+CGNSPWR?\r")
while True:
response = ser.readline()
if b" 1" in response:
break
# Ask for the navigation info parsed from NMEA sentences
ser.write(b"AT+CGNSINF\r")
while True:
response = ser.readline()
# Check if a fix was found
if b"+CGNSINF: 1,1," in response:
print ("fix found")
print (response)
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
device.display(output)
return True
# If a fix wasn't found, wait and try again
if b"+CGNSINF: 1,0," in response:
sleep(5)
ser.write(b"AT+CGNSINF\r")
print ("still looking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00d", font=FA_solid, fill="white") #X
device.display(output)
else:
ser.write(b"AT+CGNSINF\r")
# Read the GPS data for Latitude and Longitude
def getCoord():
# Start the serial connection
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
ser.write(b"AT+CGNSINF\r")
while True:
response = ser.readline()
if b"+CGNSINF: 1," in response:
# Split the reading by commas and return the parts referencing lat and long
array = response.split(b",")
lat = array[3]
lon = array[4]
time = array[2]
speed = array[6]
return (lat,lon,time,speed)
# Start the program by opening the cellular connection and creating a bucket for our data
def update_speed():
if openPPPD():
GPS_DATA = {}
while True:
# Close the cellular connection
if closePPPD():
print ("closing connection")
sleep(1)
# The range is how many data points we'll collect before streaming
for i in range(READINGS_PER_UPLOAD):
# Make sure there's a GPS fix
if checkForFix():
# Get lat and long
if getCoord():
latitude, longitude, time, speed = getCoord()
coord = str(latitude) + "," + str(longitude)
print ("Coordinates:", coord)
print ("Time:", time)
print ("Step", i+1, "out of",READINGS_PER_UPLOAD)
add_to_image.rectangle(speed_zone, fill="black", outline = "black")
add_to_image.text(speed_start, str(round(float(speed))), font=text_large, fill="white")
device.display(output)
GPS_DATA[i] = {'lat': latitude, 'long' : longitude, 'time' : time, 'speed' : speed}
sleep(SECONDS_BTW_READS)
# Turn the cellular connection on every READINGS_PER_UPLOAD reads
if i == (READINGS_PER_UPLOAD-1):
print ("opening connection")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf7c0", font=FA_solid, fill="white") #sat dish
device.display(output)
if openPPPD():
print ("streaming")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf382", font=FA_solid, fill="white") #upload
device.display(output)
url_values = urllib.parse.urlencode(GPS_DATA)
#print(url_values)
full_url = TARGET_URL + '?' + url_values
with urllib.request.urlopen(full_url) as response:
print(response)
print ("streaming complete")
GPS_DATA = {}
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00c", font=FA_solid, fill="white") #check
device.display(output)
if __name__ == '__main__':
temp_thread = Thread(target = update_temp)
speed_thread = Thread(target = update_speed)
temp_thread.start()
speed_thread.start()
speed_thread.join()
这里有很多需要改进的地方。我无法解决所有问题,只有一些建议:
首先,device.display()
阻塞。不要在每次更改时都重新绘制,而是在必要时进行批量更新:
pending_redraw = False
def update_display():
while True:
# there is a potential race condition here, not critical
if pending_redraw:
pending_redraw = False
device.display()
time.sleep(0.1)
# somewhere near the bottom:
display_thread = Thread(target=update_display)
display_thread.start()
温度线程 - 见内联评论:
# avoid magic constants, even as simple as 't='
def update_temp(temp_signature='t=', update_interval=30):
# there is no need to open/close the file handler every time.
# Moving open/close out of the loop:
f = open(device_file, 'r')
while True:
# previously: lines[1] will fail if only one line was read
line = f.readline()
# protip: instead of wrapping positive case in a huge IF,
# return/continue early
if temp_signature not in line:
continue
temp_string = line.split(temp_signature, 1)[-1]
temp_c = round(float(temp_string) / 1000.0)
add_to_image.rectangle(temp_zone, fill="black", outline = "black")
add_to_image.text(temp_start, str(temp_c), font=text_medium, fill="white")
pending_redraw = True
# previously, if 't=' signature wasn't found, the thread
# immediately went to open/close the device handle
# adding delay before that will save resources
time.sleep(update_interval)
lines = f.readlines()
openPPPD
和 closePPPD
有大量阻塞调用,但是 .. 这次将它们的优化留给您
使用 GPS:
# Move out of checkForFix - opening/closign the port is blocking and expensive
ser=serial.Serial('/dev/serial0', 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1)
# Turn on the GPS
ser.write(b"AT+CGNSPWR=1\r")
ser.write(b"AT+CGNSPWR?\r")
# slightly optimized version. Note the added sleep
while b' 1' not in ser.readline(): time.sleep(0.1)
def checkForFix():
# checkForFix() is called only called from update_speed(),
# which immediately redraws the screen after. Safe to skip redraw
print ("checking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf124", font=FA_solid, fill="white") #location icon
while True: # simplified the logic a little bit, saving few blocking writes
ser.write(b"AT+CGNSINF\r")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
response = ser.readline()
# Check if a fix was found
if b"+CGNSINF: 1,1," in response:
print ("fix found")
print (response)
pending_redraw = True
return True
# If a fix wasn't found, wait and try again
if b"+CGNSINF: 1,0," in response:
print("still looking for fix")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00d", font=FA_solid, fill="white") #X
线程速度:
# Start the program by opening the cellular connection and creating a bucket for our data
def update_speed():
if not openPPPD():
return # again, return early
GPS_DATA = {}
while True:
# Close the cellular connection
if closePPPD(): #
print ("closing connection")
sleep(1)
# The range is how many data points we'll collect before streaming
for i in range(READINGS_PER_UPLOAD):
# Make sure there's a GPS fix
# two chained IFs - just use `and`
if not (checkForFix() and getCoord()):
continue
# Get lat and long
latitude, longitude, time, speed = getCoord()
coord = str(latitude) + "," + str(longitude)
print ("Coordinates:", coord)
print ("Time:", time)
print ("Step", i+1, "out of",READINGS_PER_UPLOAD)
add_to_image.rectangle(speed_zone, fill="black", outline="black")
add_to_image.text(speed_start, str(round(float(speed))), font=text_large, fill="white")
pending_redraw = True
GPS_DATA[i] = {'lat': latitude, 'long' : longitude, 'time' : time, 'speed' : speed}
sleep(SECONDS_BTW_READS)
# Instead of checking for last iteration, just do it AFTER the loop
# Turn the cellular connection on every READINGS_PER_UPLOAD reads
print ("opening connection")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf7c0", font=FA_solid, fill="white") #sat dish
pending_redraw = True
if not openPPPD():
continue # return/continue early
print ("streaming")
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf382", font=FA_solid, fill="white") #upload
pending_redraw = True
url_values = urllib.parse.urlencode(GPS_DATA)
#print(url_values)
full_url = TARGET_URL + '?' + url_values
with urllib.request.urlopen(full_url) as response:
print(response)
print ("streaming complete")
GPS_DATA = {}
add_to_image.rectangle(icon_zone, fill="black", outline = "black")
add_to_image.text(icon_start, "\uf00c", font=FA_solid, fill="white") #check
pending_redraw = True
主要优化:
- 节省一些显示重绘
- 在
update_temp
中跳过不必要的设备 open/close
- 避免在
checkForFix
中不必要地重新打开序列号
update_speed
中的变化主要是装饰性的
作为一般规则,最好 post 这样的东西 https://codereview.stackexchange.com/ 而不是 Whosebug