避免在 udp 套接字中积累数据或从 udp 套接字读取最新数据
Avoid accumulation of data in udp socket or read newest data from udp socket
我正在尝试将数据从 C++ 代码连续发送到 python 代码。我使用 udp 套接字发送数据。发送速率比接收速率更快,因为它是一个简单的传感器代码。所以发送的数据都是在socket中累积的。当我尝试读取数据时,它 returns 是一个旧数据。如何在发送新数据时从socket读取最新数据或删除旧数据?
How can I read the newest data from the socket or delete the old data
when the new data is sent?
从套接字中读取一个数据包并将其放入缓冲区。继续从套接字中读取数据包,每次都将每个数据包放入缓冲区(替换之前缓冲区中的任何数据包数据),直到没有更多数据可供读取 - non-blocking-I/O 模式对此很有用,作为非阻塞 recv()
将抛出一个 socket.error 异常,代码为 EWOULDBLOCK
当你 运行 套接字的传入数据缓冲区中的数据不足时。读取完所有数据后,缓冲区中剩下的就是最新数据,因此请继续使用该数据。
Sketch/example 代码如下(未经测试,可能包含错误):
sock = socket.socket(family, socket.SOCK_DGRAM)
[... bind socket, etc... ]
# Receive-UDP-data event-loop begins here
sock.setblocking(False)
while True:
newestData = None
keepReceiving = True
while keepReceiving:
try:
data, fromAddr = sock.recvfrom(2048)
if data:
newestData = data
except socket.error as why:
if why.args[0] == EWOULDBLOCK:
keepReceiving = False
else:
raise why
if (newestData):
# code to handle/parse (newestData) here
发布者的数据进入缓冲区,较慢的订阅者以先进先出的方式逐条读取缓冲区。为了使其成为后进先出(也许这个描述不准确,因为在你的情况下我们只关心最后的数据),你可以用 asyncio
create_datagram_endpoint()
修改 UDP 协议所以订阅者清除当前缓冲区队列并从新队列接收最新数据。
下面是一个例子,我在macOS 11.4上用Python 3.9.6.
测试过
UdpProtocol.py为自定义的UDP协议对象。
import asyncio
class UdpProtocol:
def __init__(self):
self.packets = asyncio.Queue()
def connection_made(self, transport):
print("connection made")
def datagram_received(self, data, addr):
# clear the current queue and the accumulated data
self.packets._queue.clear()
# put latest data to the queue
self.packets.put_nowait((data, addr))
def connection_lost(self, transport):
print("connection lost")
def error_received(self, exc):
pass
async def recvfrom(self):
# get data from the queue
return await self.packets.get()
这是发布者。
import asyncio
from UdpProtocol import UdpProtocol
async def main():
server_address = ("127.0.0.1", 8000)
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UdpProtocol, local_addr=None, remote_addr=server_address)
idx = 0
while True:
transport.sendto(str(idx).encode(), server_address)
print(idx)
idx += 1
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
这是订阅者。
import asyncio
from UdpProtocol import UdpProtocol
async def main():
server_address = ("127.0.0.1", 8000)
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UdpProtocol, local_addr=server_address, remote_addr=None)
while True:
data, addr = await protocol.recvfrom()
print(data, addr)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
我使用了[这里] (Buffer size for reading UDP packets in Python) 的想法并调整了 python 套接字中的接收缓冲区。线程中还讨论了一些额外的要点。
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
这对我的应用程序很有效。至于我的应用程序,我将数据从 LabVIEW 发送到 python 并且数据不应累积。在我的情况下,丢失数据包不会影响我的应用程序,所以我基本上减少了接收缓冲区并使套接字超时。
我正在尝试将数据从 C++ 代码连续发送到 python 代码。我使用 udp 套接字发送数据。发送速率比接收速率更快,因为它是一个简单的传感器代码。所以发送的数据都是在socket中累积的。当我尝试读取数据时,它 returns 是一个旧数据。如何在发送新数据时从socket读取最新数据或删除旧数据?
How can I read the newest data from the socket or delete the old data when the new data is sent?
从套接字中读取一个数据包并将其放入缓冲区。继续从套接字中读取数据包,每次都将每个数据包放入缓冲区(替换之前缓冲区中的任何数据包数据),直到没有更多数据可供读取 - non-blocking-I/O 模式对此很有用,作为非阻塞 recv()
将抛出一个 socket.error 异常,代码为 EWOULDBLOCK
当你 运行 套接字的传入数据缓冲区中的数据不足时。读取完所有数据后,缓冲区中剩下的就是最新数据,因此请继续使用该数据。
Sketch/example 代码如下(未经测试,可能包含错误):
sock = socket.socket(family, socket.SOCK_DGRAM)
[... bind socket, etc... ]
# Receive-UDP-data event-loop begins here
sock.setblocking(False)
while True:
newestData = None
keepReceiving = True
while keepReceiving:
try:
data, fromAddr = sock.recvfrom(2048)
if data:
newestData = data
except socket.error as why:
if why.args[0] == EWOULDBLOCK:
keepReceiving = False
else:
raise why
if (newestData):
# code to handle/parse (newestData) here
发布者的数据进入缓冲区,较慢的订阅者以先进先出的方式逐条读取缓冲区。为了使其成为后进先出(也许这个描述不准确,因为在你的情况下我们只关心最后的数据),你可以用 asyncio
create_datagram_endpoint()
修改 UDP 协议所以订阅者清除当前缓冲区队列并从新队列接收最新数据。
下面是一个例子,我在macOS 11.4上用Python 3.9.6.
测试过UdpProtocol.py为自定义的UDP协议对象。
import asyncio
class UdpProtocol:
def __init__(self):
self.packets = asyncio.Queue()
def connection_made(self, transport):
print("connection made")
def datagram_received(self, data, addr):
# clear the current queue and the accumulated data
self.packets._queue.clear()
# put latest data to the queue
self.packets.put_nowait((data, addr))
def connection_lost(self, transport):
print("connection lost")
def error_received(self, exc):
pass
async def recvfrom(self):
# get data from the queue
return await self.packets.get()
这是发布者。
import asyncio
from UdpProtocol import UdpProtocol
async def main():
server_address = ("127.0.0.1", 8000)
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UdpProtocol, local_addr=None, remote_addr=server_address)
idx = 0
while True:
transport.sendto(str(idx).encode(), server_address)
print(idx)
idx += 1
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
这是订阅者。
import asyncio
from UdpProtocol import UdpProtocol
async def main():
server_address = ("127.0.0.1", 8000)
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UdpProtocol, local_addr=server_address, remote_addr=None)
while True:
data, addr = await protocol.recvfrom()
print(data, addr)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
我使用了[这里] (Buffer size for reading UDP packets in Python) 的想法并调整了 python 套接字中的接收缓冲区。线程中还讨论了一些额外的要点。
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
这对我的应用程序很有效。至于我的应用程序,我将数据从 LabVIEW 发送到 python 并且数据不应累积。在我的情况下,丢失数据包不会影响我的应用程序,所以我基本上减少了接收缓冲区并使套接字超时。