避免在 udp 套接字中积累数据或从 udp 套接字读取最新数据

Avoid accumulation of data in udp socket or read newest data from udp socket

我正在尝试将数据从 C++ 代码连续发送到 python 代码。我使用 udp 套接字发送数据。发送速率比接收速率更快,因为它是一个简单的传感器代码。所以发送的数据都是在socket中累积的。当我尝试读取数据时,它 returns 是一个旧数据。如何在发送新数据时从socket读取最新数据或删除旧数据?

How can I read the newest data from the socket or delete the old data when the new data is sent?

从套接字中读取一个数据包并将其放入缓冲区。继续从套接字中读取数据包,每次都将每个数据包放入缓冲区(替换之前缓冲区中的任何数据包数据),直到没有更多数据可供读取 - non-blocking-I/O 模式对此很有用,作为非阻塞 recv() 将抛出一个 socket.error 异常,代码为 EWOULDBLOCK 当你 运行 套接字的传入数据缓冲区中的数据不足时。读取完所有数据后,缓冲区中剩下的就是最新数据,因此请继续使用该数据。

Sketch/example 代码如下(未经测试,可能包含错误):

  sock = socket.socket(family, socket.SOCK_DGRAM)

  [... bind socket, etc... ]

  # Receive-UDP-data event-loop begins here
  sock.setblocking(False)
  while True:
     newestData = None

     keepReceiving = True
     while keepReceiving:
        try:
           data, fromAddr = sock.recvfrom(2048)
           if data:
              newestData = data
        except socket.error as why:
           if why.args[0] == EWOULDBLOCK:
              keepReceiving = False
           else:
              raise why

     if (newestData):
        # code to handle/parse (newestData) here

发布者的数据进入缓冲区,较慢的订阅者以先进先出的方式逐条读取缓冲区。为了使其成为后进先出(也许这个描述不准确,因为在你的情况下我们只关心最后的数据),你可以用 asyncio create_datagram_endpoint() 修改 UDP 协议所以订阅者清除当前缓冲区队列并从新队列接收最新数据。

下面是一个例子,我在macOS 11.4上用Python 3.9.6.

测试过

UdpProtocol.py为自定义的UDP协议对象。

import asyncio
class UdpProtocol:
    def __init__(self):
        self.packets = asyncio.Queue()

    def connection_made(self, transport):
        print("connection made")

    def datagram_received(self, data, addr):
        # clear the current queue and the accumulated data
        self.packets._queue.clear()
        # put latest data to the queue
        self.packets.put_nowait((data, addr))

    def connection_lost(self, transport):
        print("connection lost")

    def error_received(self, exc):
        pass

    async def recvfrom(self):
        # get data from the queue
        return await self.packets.get()

这是发布者。

import asyncio
from UdpProtocol import UdpProtocol

async def main():
    server_address = ("127.0.0.1", 8000)
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        UdpProtocol, local_addr=None, remote_addr=server_address)

    idx = 0
    while True:
        transport.sendto(str(idx).encode(), server_address)
        print(idx)
        idx += 1
        await asyncio.sleep(0.1)

if __name__ == "__main__":
    asyncio.run(main())

这是订阅者。

import asyncio
from UdpProtocol import UdpProtocol

async def main():
    server_address = ("127.0.0.1", 8000)
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        UdpProtocol, local_addr=server_address, remote_addr=None)
    while True:
        data, addr = await protocol.recvfrom()
        print(data, addr)
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())

我使用了[这里] (Buffer size for reading UDP packets in Python) 的想法并调整了 python 套接字中的接收缓冲区。线程中还讨论了一些额外的要点。

sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

这对我的应用程序很有效。至于我的应用程序,我将数据从 LabVIEW 发送到 python 并且数据不应累积。在我的情况下,丢失数据包不会影响我的应用程序,所以我基本上减少了接收缓冲区并使套接字超时。