Pi Zero W 连接到两个外围设备(GPIO 和 USB):如何同时从两个外围设备连续读取?

PiZero W connected to two peripherals (GPIO and USB): how to continuously read from both at same time?

我有一个 raspberry pizero W,它通过 GPIO 引脚连接到流量计,通过 USB 连接到条形码扫描仪。我有 python 脚本,它使用回调函数在检测到 GPIO 输入时发出警报。此 python 脚本需要在 pizero 上连续 运行 运行,以便识别流量计何时激活并处理输入。

问题是我还有一个条形码扫描仪通过 USB 连接到 pizero。我希望 pizero 也能识别何时扫描条形码并处理该输入。

pizero 然后应该发送一条消息,其中包含来自流量计的信息和来自条形码扫描仪的信息。

有没有办法在同一个 python 脚本中做到这一点?我怎样才能让 pizero 同时监听和处理两个输入?将其分成两个不同的脚本是否更容易实现,如果是这样,我可以同时 运行 它们并以某种方式统一它们在第 3 个连续 运行ning 脚本中提供的信息吗?

谢谢!

根据评论进行一些澄清(感谢您的输入):

系统启动时脚本需要运行。我会看看 systemctl,因为直到有人提到它我才听说过它。

当流量计未连接时,Pi 通常会将正在扫描的条形码识别为键盘输入(即一系列数字后跟一个换行符)。

当我发送包含流量计和条形码信息的消息时,我需要从 python 发送一个 JSON 对象,其中包含信息片段和信息发送时间的时间戳已收到。

这个 JSON 对象将通过 wifi 发送到 raspberry pi 服务器,该服务器具有与 pizero 相同的家庭网络上的静态 ip。 raspberry pi 服务器可以访问 Django 数据库,该数据库应将 JSON 对象信息合并到数据库中。

更新答案

我已经为条形码添加了一些代码reader。我这样做是为了让条形码 reader 花费可变的时间,最多需要 5 秒来获取读数,而流量计需要恒定的 0.5 秒,因此您可以看到不同的线程以不同的速率独立于一个线程另一个。

#!/usr/bin/env python3

from threading import Lock
import threading
import time
from random import seed
from random import random

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    # Take 0.5s to read
    time.sleep(0.5)
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

# Dummy function to read Barcode as I don't have anything attached
def readBarcode():
    # Take variable time, 0..5 seconds, to read
    time.sleep(random()*5)
    result = "BC" + str(int(random()*1000))
    return result

class BarcodeReader(threading.Thread):
    def __init__(self):
        super(BarcodeReader, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read barcode and safely update self.currentReading
        while True:
            value = readBarcode()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Generate repeatable random numbers
    seed(42)

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.daemon = True
    fmThread.start()

    # Instantiate and start barcode reader thread
    bcThread = BarcodeReader()
    bcThread.daemon = True
    bcThread.start()

    # Now you can do other things in main, but always get access to latest readings
    for i in range(20):
        fmReading = fmThread.read()
        bcReading = bcThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}, Barcode={bcReading}")
        time.sleep(1)

示例输出

Main: i = 0 FlowMeter reading = 0, Barcode=0
Main: i = 1 FlowMeter reading = 1, Barcode=0
Main: i = 2 FlowMeter reading = 3, Barcode=0
Main: i = 3 FlowMeter reading = 5, Barcode=0
Main: i = 4 FlowMeter reading = 7, Barcode=BC25
Main: i = 5 FlowMeter reading = 9, Barcode=BC223
Main: i = 6 FlowMeter reading = 11, Barcode=BC223
Main: i = 7 FlowMeter reading = 13, Barcode=BC223
Main: i = 8 FlowMeter reading = 15, Barcode=BC223
Main: i = 9 FlowMeter reading = 17, Barcode=BC676
Main: i = 10 FlowMeter reading = 19, Barcode=BC676
Main: i = 11 FlowMeter reading = 21, Barcode=BC676
Main: i = 12 FlowMeter reading = 23, Barcode=BC676
Main: i = 13 FlowMeter reading = 25, Barcode=BC86
Main: i = 14 FlowMeter reading = 27, Barcode=BC86
Main: i = 15 FlowMeter reading = 29, Barcode=BC29
Main: i = 16 FlowMeter reading = 31, Barcode=BC505
Main: i = 17 FlowMeter reading = 33, Barcode=BC198
Main: i = 18 FlowMeter reading = 35, Barcode=BC198
Main: i = 19 FlowMeter reading = 37, Barcode=BC198

原答案

我建议您查看 systemdsystemctl 以在每次系统启动时启动您的应用程序 - 例如 here.

关于同时监控两件事,我建议您使用 Python 的 threading 模块。这是一个简单的例子,我创建了一个从 threading 子类化的对象,它通过不断读取流量计并将当前值保存在主程序可以随时读取的变量中来管理流量计。您可以启动另一个类似的管理您的条形码 reader 和 运行 我平行。我不想那样做,并用双倍的代码把你弄糊涂了。

#!/usr/bin/env python3

from threading import Lock
import threading
import time

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()
            time.sleep(0.01)

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.start()

    # Now you can do other things in main, but always get access to latest reading
    for i in range(100000):
        fmReading = fmThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}")
        time.sleep(1)

您可以考虑使用 logging 来协调和统一您的调试和日志消息 - 请参阅 here

您可以查看 events 让其他线程知道当某些事情达到临界水平时需要做些什么 - 例如 here

另一个可能更简单的选择是使用 RedisRedis 是一个非常高性能的内存数据结构服务器。在 Raspberry Pi、Mac、Linux Windows 或其他机器上安装很简单。它允许您在网络中任意数量的客户端之间共享原子整数、字符串、列表、散列、队列、集合和有序集合。

所以这个概念可能是让一个单独的程序监控流量计,并根据需要将当前读数填充到 Redis 中。然后另一个单独的程序读取条形码并将它们填充到 Redis 中,只要你喜欢就可以。最后,有一个控制程序,可能在您网络上的其他地方,可以随时获取这两个值。

请注意,您可以 运行 Redis 服务器在您的 Raspberry Pi 或任何其他机器上。

所以,这里是流量计程序 - 只需将 host 更改为机器 运行ning Redis 的 IP 地址:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis
r = redis.Redis(host,port)

reading = 0
while True:
   # Generate synthetic reading that just increases every 500ms
   reading +=1 
   # Stuff reading into Redis as "fmReading"
   r.set("fmReading",reading)
   time.sleep(0.5)

这里是条码读取程序:

#!/usr/bin/env python3

import redis
import time
from random import random, seed

host='localhost'
port=6379

# Connect to local Redis server
r = redis.Redis(host,port)

# Generate repeatable random numbers
seed(42)

while True:
   # Synthesize barcode and change every 2 seconds
   barcode = "BC" + str(int((random()*1000)))
   # Stuff barcode into Redis as "barcode"
   r.set("barcode",barcode)
   time.sleep(2)

这里是主控程序:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis server
r = redis.Redis(host,port)

while True:
   # Grab latest flowmeter reading and barcode
   fmReading = r.get("fmReading")
   barcode   = r.get("barcode")
   print(f"Main: fmReading={fmReading}, barcode={barcode}")
   time.sleep(1)

示例输出

Main: fmReading=b'10', barcode=b'BC676'
Main: fmReading=b'12', barcode=b'BC892'
Main: fmReading=b'14', barcode=b'BC892'
Main: fmReading=b'16', barcode=b'BC86'
Main: fmReading=b'18', barcode=b'BC86'
Main: fmReading=b'20', barcode=b'BC421'
Main: fmReading=b'22', barcode=b'BC421'
Main: fmReading=b'24', barcode=b'BC29'

请注意,为了对此进行调试,您还可以使用命令行界面从网络上的任何机器上获取任何读数 Redis,例如在终端:

redis-cli get barcode
"BC775"

如果您想在用 PHP 编写的 Web 浏览器中显示这些值,您也可以使用 PHP 绑定到 Redis 来获取这些值 - 非常方便!

当然,您可以调整程序以发送每个读数的时间戳 - 可以使用 Redis hash 而不是简单的密钥。您还可以使用 Redis LPUSHBRPOP.

实现队列以在程序之间发送消息

关键字: Redis, list, queue, hash, Raspberry Pi