永远不会到达最后一个线程

Last thread is never reached

首先我将解释我的代码是如何工作的。 我有 3 个相互交互的模块:2 个模块通过套接字连接到一个模块并发送 UDP 帧。单个模块接收 UDP 帧,将它们保存到队列中,然后另一个函数将队列作为输入并进行一些处理。

我正在 运行 在单独的终端中发送 UDP 帧的模块。我想要 运行 接收 UDP 帧的函数和在不同线程中对保存的帧进行处理的函数。为此,我使用了线程和队列包。但是我没能 运行 将所有线程放在一起;它总是卡在第二个线程中,永远不会到达最后一个线程。

这是我的代码:
send_1.py:

import socket
import pickle
import time
def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    MESSAGE = {'x': 0.20, 'y': 0.2, 'z': 0.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:

        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()

send_2.py:

import socket
import pickle
import time


def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5006
    # MESSAGE = b"Hello, World!"
    MESSAGE = {'x': 2.20, 'y': 2.2, 'z': 2.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:
        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()

这是接收帧的代码,将它们保存到队列然后处理它们。
receive.py:

import threading
import queue
import socket
import pickle
import time


class ReceiveData1:
    def __init__(self):
        pass

    def receive_frame(self, q_1):
        UDP_IP = "127.0.0.1"
        UDP_PORT = 5005

        sock = socket.socket(socket.AF_INET,  # Internet
                             socket.SOCK_DGRAM)  # UDP
        sock.bind((UDP_IP, UDP_PORT))

        while True:
            data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
            data_1 = pickle.loads(data)
            print('data_1:', data_1)

            ts_1 = time.time()
            frame_1 = {'data': data_1, 'timestamp': ts_1}
            q_1.put(frame_1)


class ReceiveData2:
    def __init__(self):
        pass

    def receive_frame(self, q_2):
        UDP_IP = "127.0.0.1"
        UDP_PORT = 5006

        sock = socket.socket(socket.AF_INET,  # Internet
                             socket.SOCK_DGRAM)  # UDP
        sock.bind((UDP_IP, UDP_PORT))

        while True:
            data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
            data_2 = pickle.loads(data)
            print('data_2:', data_2)

            ts_2 = time.time()
            frame_2 = {'data': data_2, 'timestamp': ts_2}
            q_2.put(frame_2)


class MatchFrames:
    def __init__(self, delta_x, delta_y):
        self.delta_x = delta_x
        self.delta_y = delta_y

    def get_decision(self, queue_1, queue_2):
        print('queue_1:', queue_1)
        print('queue_2:', queue_2)
        frame_1 = queue_1.get()
        frame_2 = queue_2.get()
        data_1 = frame_1['data']
        data_1_ts = frame_1['timestamp']
        data_2 = frame_2['data']
        data_2_ts = frame_2['timestamp']
        decision = 'Unknown'
        while time.time() < data_1_ts + 3 and time.time() < data_2_ts + 3:
            if (data_2['x'] - self.delta_x <= data_1['x'] <= data_2['x'] + self.delta_x and
                    data_2['y'] - self.delta_y <= data_1['y'] <= data_2['y'] + self.delta_y):
                decision = 'Correct'
                break
            else:
                decision = 'Wrong'
                break
        print(decision)


if __name__ == '__main__':
    threads = []
    q_1 = queue.Queue()
    rec_1 = ReceiveData1()
    q_2 = queue.Queue()
    rec_2 = ReceiveData2()
    decide = MatchFrames(0.5, 0.5)
    t1 = threading.Thread(target=rec_1.receive_frame, args=(q_1,))
    t1.daemon = True
    threads.append(t1)
    t1.start()

    t2 = threading.Thread(target=rec_2.receive_frame, args=(q_2,))
    t2.daemon = True
    threads.append(t2)
    t2.start()

    t3 = threading.Thread(target=decide.get_decision, args=(q_1, q_2,))
    t3.daemon = True
    threads.append(t3)
    t3.start()

    for t in threads:
        t.join()
        q_1.join()
        q_2.join()

据我了解,可能是因为join():通过运行宁thread.join()下一个线程将等到前一个线程完成才能运行,这永远不会发生,因为它会卡在 while 循环中。

关于如何使三个线程 运行 一起并继续接收 UDP 帧有什么建议吗?

我使用 asyncio

设法解决了这个问题

我对代码做了一些修改,现在只有方法没有 类。
所以代码如下:

send_1.py

import socket
import pickle
import time
def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    MESSAGE = {'x': 0.20, 'y': 0.2, 'z': 0.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:

        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()  

send_2.py

import socket
import pickle
import time


def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5006
    # MESSAGE = b"Hello, World!"
    MESSAGE = {'x': 2.20, 'y': 2.2, 'z': 2.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:
        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()

现在使用接收帧的代码,将它们保存到队列中然后处理它们。 receive.py

import asyncio
import queue
import socket
import pickle
import time


async def receive_frame1(q_1):
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
        data_1 = pickle.loads(data)
        print('data_1:', data_1)

        ts_1 = time.time()
        frame_1 = {'data': data_1, 'timestamp': ts_1}
        q_1.put(frame_1)
        await asyncio.sleep(0)


async def receive_frame2(q_2):
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5006

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
        data_2 = pickle.loads(data)
        print('data_2:', data_2)

        ts_2 = time.time()
        frame_2 = {'data': data_2, 'timestamp': ts_2}
        q_2.put(frame_2)
        await asyncio.sleep(0)


async def get_decision(queue_1, queue_2, delta_x, delta_y):
    while True:
        print('queue_1:', queue_1)
        print('queue_2:', queue_2)
        frame_1 = queue_1.get()
        frame_2 = queue_2.get()
        data_1 = frame_1['data']
        data_1_ts = frame_1['timestamp']
        data_2 = frame_2['data']
        data_2_ts = frame_2['timestamp']
        decision = 'Unknown'
        while time.time() < data_1_ts + 3 and time.time() < data_2_ts + 3:
            if (data_2['x'] - delta_x <= data_1['x'] <= data_2['x'] + delta_x and
                    data_2['y'] - delta_y <= data_1['y'] <= data_2['y'] + delta_y):
                decision = 'Correct'
                break
            else:
                decision = 'Wrong'
                break
        print('ts:', data_1_ts)
        print('ts2:', data_2_ts)
        print(decision)
        print('#' * 32)
        await asyncio.sleep(0)
        

if __name__ == '__main__':
    q_1 = queue.Queue()
    q_2 = queue.Queue()
    asyncio.ensure_future(receive_frame1(q_1))
    asyncio.ensure_future(receive_frame2(q_2))
    asyncio.ensure_future(get_decision(q_1, q_2, 3.5, 3.5))
    loop = asyncio.get_event_loop()
    loop.run_forever()

asyncio 帮助我 运行 异步线程并让它们 运行 持续使用到 loop.run_forever()

一些资料帮助我理解了如何使用 asyncio:
1- Getting Started With Async Features in Python
2-