在 windows 接收时 Digi-Xbee 数据包积压
Digi-Xbee Packet Backlog when receiving on windows
我们 运行 在 Windows 上接收 ZigBee 数据包时遇到了一个奇怪的问题。
我们目前每 0.8 秒从无人机发送一次数据包,但在 Windows 我们每 ~5 秒接收一次数据包。我们没有丢失数据包,因为接收到的数据包 ID 正确增加了。
st运行ge 的事情是,当在同一台计算机上使用 Ubuntu VM 时,它运行完美,我们在大约 0.8s 时收到。
我们正在使用 Digi-Xbee 1.4 和通过 USB 连接的“Xbee SMT Grove 开发板”进行发送和接收。
这是用于在 Windows 和 Ubuntu 上接收的代码:
def connect(self):
self._device = XBeeDevice(self._port, self._baudRate)
self._device.open()
self._device.set_16bit_addr(XBee16BitAddress(utils.int_to_bytes(int(self._address))))
self._network = self._device.get_network()
self._device.add_packet_received_callback(self._packetReceivedCallback)
def _packetReceivedCallback(self, packet):
print("Received!")
#Processing after this
有没有人遇到过这种行为?
好的,以防万一有人遇到同样的问题,我们基本上通过实现自己的串行接口来解决它。这种方法非常基础,如果您需要 Digi-Xbee Python 的更多高级功能,它可能对您没有帮助。
在我们的例子中,我们发送 json.dumps() 作为数据,所以我们可以用 {} 分隔它。您可能需要针对其他数据类型更改它。
from digi.xbee.models.options import TransmitOptions
from digi.xbee.models.address import XBee64BitAddress
from digi.xbee.exception import XBeeException
from digi.xbee.packets.raw import TX64Packet
from threading import Thread
import serial
class ZigbeeConnection():
def __init__(self, port, baudRate = 230400):
self._port = port
self._baudRate = baudRate
self._packetID = 0
self._serialPort = serial.Serial(port=self._port,baudrate= self._baudRate )
self._readActive = True
self._T = Thread(target=self._startReading, daemon=True)
self._T.start()
def _startReading(self):
while(self._readActive):
rawbytes = self._serialPort.read_until(expected=b"\x02")
data=(str(rawbytes)[2:str(rawbytes).rfind("}")+1])
print(data)
def sendBroadCastMessage(self,msg:str):
try:
msg = msg.encode("utf-8")
packet = TX64Packet(self._packetID, XBee64BitAddress.BROADCAST_ADDRESS,
TransmitOptions.NONE.value, rf_data=msg)
self._serialPort.write(packet.output())
self._packetID += 1
except XBeeException as e:
pass
我们 运行 在 Windows 上接收 ZigBee 数据包时遇到了一个奇怪的问题。 我们目前每 0.8 秒从无人机发送一次数据包,但在 Windows 我们每 ~5 秒接收一次数据包。我们没有丢失数据包,因为接收到的数据包 ID 正确增加了。 st运行ge 的事情是,当在同一台计算机上使用 Ubuntu VM 时,它运行完美,我们在大约 0.8s 时收到。
我们正在使用 Digi-Xbee 1.4 和通过 USB 连接的“Xbee SMT Grove 开发板”进行发送和接收。
这是用于在 Windows 和 Ubuntu 上接收的代码:
def connect(self):
self._device = XBeeDevice(self._port, self._baudRate)
self._device.open()
self._device.set_16bit_addr(XBee16BitAddress(utils.int_to_bytes(int(self._address))))
self._network = self._device.get_network()
self._device.add_packet_received_callback(self._packetReceivedCallback)
def _packetReceivedCallback(self, packet):
print("Received!")
#Processing after this
有没有人遇到过这种行为?
好的,以防万一有人遇到同样的问题,我们基本上通过实现自己的串行接口来解决它。这种方法非常基础,如果您需要 Digi-Xbee Python 的更多高级功能,它可能对您没有帮助。 在我们的例子中,我们发送 json.dumps() 作为数据,所以我们可以用 {} 分隔它。您可能需要针对其他数据类型更改它。
from digi.xbee.models.options import TransmitOptions
from digi.xbee.models.address import XBee64BitAddress
from digi.xbee.exception import XBeeException
from digi.xbee.packets.raw import TX64Packet
from threading import Thread
import serial
class ZigbeeConnection():
def __init__(self, port, baudRate = 230400):
self._port = port
self._baudRate = baudRate
self._packetID = 0
self._serialPort = serial.Serial(port=self._port,baudrate= self._baudRate )
self._readActive = True
self._T = Thread(target=self._startReading, daemon=True)
self._T.start()
def _startReading(self):
while(self._readActive):
rawbytes = self._serialPort.read_until(expected=b"\x02")
data=(str(rawbytes)[2:str(rawbytes).rfind("}")+1])
print(data)
def sendBroadCastMessage(self,msg:str):
try:
msg = msg.encode("utf-8")
packet = TX64Packet(self._packetID, XBee64BitAddress.BROADCAST_ADDRESS,
TransmitOptions.NONE.value, rf_data=msg)
self._serialPort.write(packet.output())
self._packetID += 1
except XBeeException as e:
pass