在处理双端队列时接收 UDP 消息
Receiving UDP message while processing a deque
为了解决 this,我试图简化问题。假设我有一个接收方同时监听 TCP 和 UDP 消息。它将接收几个字符串,将它们附加到双端队列,在收到 "finish"
消息后,它将开始处理双端队列。
如果我收到一个UDP报文,我需要停止处理,移除deque的最后一项,然后继续处理。
from collections import deque
host = commands.getoutput("hostname -I")
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print "received data:", data
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
inputready,outputready,exceptready = select(input,[],[])
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()
我在收到 UDP 消息后暂停程序然后返回处理阶段时遇到问题。现在,如果在处理过程中将 UDP 消息发送到我的程序,它将在处理结束之前不会收到消息(然后双端队列为空)。我认为线程或多处理可能会有所帮助,但我不知道如何将它们应用到代码中。
没有人强迫你清空dequeue。您可以在下一个工作负载出列之前检查 UDP 消息是否已到达。这就是您可以使用线程获得的最大程度,因为它们不允许您中断任意代码。它们总是只能合作终止。
如果您的单个项目处理时间太长,那么工作项目的多处理是一种选择,因为您可以杀死一个外部进程。
在继续处理下一个工作负载之前,使用 select.select
在短时间内检查套接字上的传入数据。或者,您可以使用等待线程输入的线程并操作出列。
EDIT 这是您的代码,用于处理 python3、select.select 和超时。触发 read_udp 与带有 echo foo | nc -4 -u localhost 5005
的 netcat 一起工作,但随后会触发异常,因为您假设出列中存在元素 - 这是一个应用程序逻辑问题,与如何交错监听和工作的问题无关。
import socket
import select
from collections import deque
host = "localhost"
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print("received data:", data)
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
def processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
print("select.select")
inputready,outputready,exceptready = select.select(input,[],[], 0.1)
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()
为了解决 this,我试图简化问题。假设我有一个接收方同时监听 TCP 和 UDP 消息。它将接收几个字符串,将它们附加到双端队列,在收到 "finish"
消息后,它将开始处理双端队列。
如果我收到一个UDP报文,我需要停止处理,移除deque的最后一项,然后继续处理。
from collections import deque
host = commands.getoutput("hostname -I")
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print "received data:", data
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
inputready,outputready,exceptready = select(input,[],[])
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()
我在收到 UDP 消息后暂停程序然后返回处理阶段时遇到问题。现在,如果在处理过程中将 UDP 消息发送到我的程序,它将在处理结束之前不会收到消息(然后双端队列为空)。我认为线程或多处理可能会有所帮助,但我不知道如何将它们应用到代码中。
没有人强迫你清空dequeue。您可以在下一个工作负载出列之前检查 UDP 消息是否已到达。这就是您可以使用线程获得的最大程度,因为它们不允许您中断任意代码。它们总是只能合作终止。
如果您的单个项目处理时间太长,那么工作项目的多处理是一种选择,因为您可以杀死一个外部进程。
在继续处理下一个工作负载之前,使用 select.select
在短时间内检查套接字上的传入数据。或者,您可以使用等待线程输入的线程并操作出列。
EDIT 这是您的代码,用于处理 python3、select.select 和超时。触发 read_udp 与带有 echo foo | nc -4 -u localhost 5005
的 netcat 一起工作,但随后会触发异常,因为您假设出列中存在元素 - 这是一个应用程序逻辑问题,与如何交错监听和工作的问题无关。
import socket
import select
from collections import deque
host = "localhost"
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print("received data:", data)
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
def processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
print("select.select")
inputready,outputready,exceptready = select.select(input,[],[], 0.1)
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()