强制非阻塞 UDP 套接字在 sendto 上引发 BlockingIOError
Force a non-blocking UDP socket to raise BlockingIOError on sendto
我相信非阻塞 UDP 套接字可以在 sendto 上引发 BlockingIOError。我想强制这种情况来测试我的程序在这种情况下的行为。
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('', 7777))
...
# Even if repeating in a loop, doesn't seem to raise a BlockingIOError
sock.sendto(b'abcde', ('1.2.3.4', 61908))
我已经尝试通过
将传出缓冲区设置得较小
buffer_size = 5
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, buffer_size)
但它要么似乎没有影响,要么导致 OSError: [Errno 40] Message too long
如果缓冲区小于数据。
我的信念是错误的吗:它 永远不会 引发 BlockingIOError 吗?或者如果可以,我该如何强制执行?
我的目标是进行集成式测试:我想要真正的套接字并进行实际的网络调用。在这种情况下为单元式测试模拟套接字并不理想。
套接字只能在单线程 Python asyncio 程序中使用。
来自 TCP/IP C 中的套接字:程序员实用指南,Michael J. Donahoo,Kenneth L. Calvert
For UDP sockets, there are no send buffers, so send()
and sendto()
never return EWOULDBLOCK
从 https://linux.die.net/man/2/sendto 中可以看出为什么...
Packets are just silently dropped when a device queue overflows.
所以在 Python 条款中,BlockingIOError
不能提高,至少在 Linux.
我相信非阻塞 UDP 套接字可以在 sendto 上引发 BlockingIOError。我想强制这种情况来测试我的程序在这种情况下的行为。
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('', 7777))
...
# Even if repeating in a loop, doesn't seem to raise a BlockingIOError
sock.sendto(b'abcde', ('1.2.3.4', 61908))
我已经尝试通过
将传出缓冲区设置得较小buffer_size = 5
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, buffer_size)
但它要么似乎没有影响,要么导致 OSError: [Errno 40] Message too long
如果缓冲区小于数据。
我的信念是错误的吗:它 永远不会 引发 BlockingIOError 吗?或者如果可以,我该如何强制执行?
我的目标是进行集成式测试:我想要真正的套接字并进行实际的网络调用。在这种情况下为单元式测试模拟套接字并不理想。
套接字只能在单线程 Python asyncio 程序中使用。
来自 TCP/IP C 中的套接字:程序员实用指南,Michael J. Donahoo,Kenneth L. Calvert
For UDP sockets, there are no send buffers, so
send()
andsendto()
never returnEWOULDBLOCK
从 https://linux.die.net/man/2/sendto 中可以看出为什么...
Packets are just silently dropped when a device queue overflows.
所以在 Python 条款中,BlockingIOError
不能提高,至少在 Linux.