基于队列的通信:发送 return 队列是个好主意吗?

Queue-based communication: is it a good idea to send a return-queue?

我正在使用以下方法在另一个线程中等待异步操作的结果运行nig:

def client(worker_queue, message):
    answer_queue = queue.Queue(maxsize=1)
    worker_queue.put((answer_queue, message))
    result = answer_queue.get(timeout=10)


def worker():
    while True:
        answer_queue, message = worker_queue.get()
        result = do_someting_with(message)
        answer_queue.put(result)
        worker_queue.task_done()

(primitiv worker 只是一个示例。在其他情况下,可能需要在多个回调之间传递“answer_queue”)

这是个好主意,还是我会 运行 遇到问题(例如内存管理)?

有更好的方法吗?

我知道 asyncio 有类似 futures 的东西来处理这样的问题,但目前我正在寻找(也)可以多线程工作的东西。

Python 使用内存地址访问 class 变量及其方法。使用队列只是确保每个工作人员都有一个唯一的内存地址位置来放置其答案。

确保,如果变量仍然是局部变量,那么当您不再需要该变量时,您可以重新分配它,这样垃圾收集可以释放队列正在使用的内存,或者使用 del 清除内存手动执行关键字,例如 del answer_queue.

您可以使用任何数据类型来完成工作程序和客户端之间传输数据的角色,因为所有 class 方法都是通过内存地址访问的;然而,最常见的数据传输方式是:

使用队列

queue.Queue() 已经非常优化并且非常通用,因此它是在客户端和工作人员之间执行通信的可靠方式。

import queue
import datetime
from threading import Thread
def queue_client(worker_queue, message):
    answer_queue = queue.Queue(maxsize=1)
    worker_queue.put((answer_queue, message))
    result = answer_queue.get(timeout=60)
    print('Bytes received from worker:', len(result))

def queue_worker(worker_queue):
    answer_queue, message = worker_queue.get()
    do_something = lambda x: x[::-1]
    result = do_something(message)
    answer_queue.put(result)

if __name__ == '__main__':
    large_message = 'X' * (2<<30) #2GB

    worker_queue = queue.Queue()
    client = Thread(target=queue_client, args=(worker_queue, large_message,))
    worker = Thread(target=queue_worker, args=(worker_queue,))
    start_time = datetime.datetime.now()
    client.start()
    worker.start()
    client.join()
    worker.join()
    dt = datetime.datetime.now() - start_time
    print('Time elapsed using Queue:', dt.total_seconds()) #~1.47 secs on my machine

共享内存类型

由于在多线程时可以直接访问变量地址,因此可以使用共享内存;这通常是访问数据的最快方式,但您需要某种类型的数据容器对象。

一个非常基本的例子(worker 产卵、答案提交等可以添加到管理器):

import datetime
from threading import Thread

class DataManager:

    def __init__(self,):
        self.tasks = {
            #worker_id: message
        }
        self.answers = {
            #worker_id: answer
        }

def shared_client(datamanager, worker_id, message):
    datamanager.tasks[worker_id] = message
    while not datamanager.answers.get(worker_id, None): #Wait for answer
        pass
    result = datamanager.answers[worker_id]
    print('Bytes received from worker:', len(result))
    

def shared_worker(data_manager, worker_id):
    while not data_manager.tasks.get(worker_id, None): #Wait for task to get assigned
        pass

    message = data_manager.tasks[worker_id]
    do_something = lambda x: x[::-1]
    result = do_something(message)
    data_manager.answers[worker_id] = result

if __name__ == '__main__':
    large_message = 'X' * (2<<30) #2GB
    data_manager = DataManager()

    worker_id = 0
    client = Thread(
        target=shared_client, 
        args=(data_manager, worker_id, large_message,)
    )
    worker = Thread(target=shared_worker, args=(data_manager, worker_id))
    start_time = datetime.datetime.now()
    client.start()
    worker.start()
    client.join()
    worker.join()
    dt = datetime.datetime.now() - start_time
    print('Time elapsed using Shared Memory:', dt.total_seconds()) #~1.44 secs on my machine

如果您使用共享内存 types/ctypes,例如 Multiprocessing.Value 和 Multiprocessing.Array (link).[=18=,可能会有更优雅(甚至可能更快)的解决方案可用]

管道

管道充当连接,一端监听,另一端发送数据。 重要的是,当发送大于 ~32MB 的数据包时,这比队列更有利,而 multiprocessing。此外,它是可序列化的,不像 queue.Queue().
使用管道的示例:

import datetime
from threading import Thread
from multiprocessing import Pipe

def client(conn, message):
    conn.send(message)      #Send message to worker
    result = conn.recv()    #Receive result
    print('Bytes received from worker:', len(result))

def worker(conn):
    message = conn.recv() #Get message from client
    do_something = lambda x: x[::-1]
    result = do_something(message)
    conn.send(result) #Send result to client

if __name__ == '__main__':
    large_message = 'X' * (2<<30) #2GB
    client_conn, worker_conn = Pipe(duplex=True) #Bidirectional pipe
    client_process = Thread(target=client, args=(client_conn, large_message))
    worker_process = Thread(target=worker, args=(worker_conn,))
    
    start_time = datetime.datetime.now()
    client_process.start()
    worker_process.start()

    client_process.join()
    worker_process.join()
    dt = datetime.datetime.now() - start_time

    print('Time elapsed using Pipe:', dt.total_seconds()) #~9.07 secs on my machine

代理

代理在客户端注册函数,并允许通过调用底层公开函数的工作人员执行它们。当您的工作人员与您的客户在不同的计算机上时,这要复杂得多,但对于集群计算来说是必要的。