在处理双端队列时接收 UDP 消息

Receiving UDP message while processing a deque

为了解决 this,我试图简化问题。假设我有一个接收方同时监听 TCP 和 UDP 消息。它将接收几个字符串,将它们附加到双端队列,在收到 "finish" 消息后,它将开始处理双端队列。

如果我收到一个UDP报文,我需要停止处理,移除deque的最后一项,然后继续处理。

from collections import deque

host = commands.getoutput("hostname -I")
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()


def read_tcp(s):
    conn, addr = s.accept()
    print('Connected with', *addr)
    while 1:
        data = conn.recv(BUFFER_SIZE)
        if not data: break
        print "received data:", data
        conn.send(data)  # echo
    conn.close()
    if (data == 'finish'):
        processP(q)
    else:
        q.append(data)

def read_udp(s):
    data,addr = s.recvfrom(1024)
    print("received message:", data)
    del q[-1]


processP(q):
    text = q.popleft()
    textReverse = text[::-1]
    print(textReverse)

def run():
    # create tcp socket
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        tcp.bind((host,port))
    except socket.error as err:
        print('Bind failed', err)
        return
    tcp.listen(1)
    # create udp socket
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    udp.bind((host,port))
    print('***Socket now listening at***:', host, port)
    input = [tcp,udp]
    try:
        while True:
            inputready,outputready,exceptready = select(input,[],[])
            for s in inputready:
                if s == tcp:
                    read_tcp(s)
                elif s == udp:
                    read_udp(s)
                else:
                    print("unknown socket:", s)
    # Hit Break / Ctrl-C to exit
    except KeyboardInterrupt:
        print('\nClosing')
        raise
    tcp.close()
    udp.close()

if __name__ == '__main__':
    run()

我在收到 UDP 消息后暂停程序然后返回处理阶段时遇到问题。现在,如果在处理过程中将 UDP 消息发送到我的程序,它将在处理结束之前不会收到消息(然后双端队列为空)。我认为线程或多处理可能会有所帮助,但我不知道如何将它们应用到代码中。

没有人强迫你清空dequeue。您可以在下一个工作负载出列之前检查 UDP 消息是否已到达。这就是您可以使用线程获得的最大程度,因为它们不允许您中断任意代码。它们总是只能合作终止。

如果您的单个项目处理时间太长,那么工作项目的多处理是一种选择,因为您可以杀死一个外部进程。

在继续处理下一个工作负载之前,使用 select.select 在短时间内检查套接字上的传入数据。或者,您可以使用等待线程输入的线程并操作出列。

EDIT 这是您的代码,用于处理 python3、select.select 和超时。触发 read_udp 与带有 echo foo | nc -4 -u localhost 5005 的 netcat 一起工作,但随后会触发异常,因为您假设出列中存在元素 - 这是一个应用程序逻辑问题,与如何交错监听和工作的问题无关。

import socket
import select
from collections import deque

host = "localhost"
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()


def read_tcp(s):
    conn, addr = s.accept()
    print('Connected with', *addr)
    while 1:
        data = conn.recv(BUFFER_SIZE)
        if not data: break
        print("received data:", data)
        conn.send(data)  # echo
    conn.close()
    if (data == 'finish'):
        processP(q)
    else:
        q.append(data)

def read_udp(s):
    data,addr = s.recvfrom(1024)
    print("received message:", data)
    del q[-1]


def processP(q):
    text = q.popleft()
    textReverse = text[::-1]
    print(textReverse)

def run():
    # create tcp socket
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        tcp.bind((host,port))
    except socket.error as err:
        print('Bind failed', err)
        return
    tcp.listen(1)
    # create udp socket
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    udp.bind((host,port))
    print('***Socket now listening at***:', host, port)
    input = [tcp,udp]
    try:
        while True:
            print("select.select")
            inputready,outputready,exceptready = select.select(input,[],[], 0.1)
            for s in inputready:
                if s == tcp:
                    read_tcp(s)
                elif s == udp:
                    read_udp(s)
                else:
                    print("unknown socket:", s)
    # Hit Break / Ctrl-C to exit
    except KeyboardInterrupt:
        print('\nClosing')
        raise
    tcp.close()
    udp.close()

if __name__ == '__main__':
    run()