Python:存储和删除线程 to/from 列表

Python: Store and delete Threads to/from List

我想实现一个流式服务器,它向所有连接的客户端发送无尽的数据流。多个客户端应该能够连接和断开与服务器的连接,以便以不同的方式处理数据。

每个客户端都由一个专用的 ClientThread 提供服务,该客户端线程是 类 线程并包含要发送到客户端的数据队列(必需,因为客户端可能以不同的速度处理数据,并且因为突发可能会出现客户端可能无法处理的数据)。

该程序通过单独的 ClientHandlerThread 侦听传入的客户端连接。每当客户端连接时,ClientHandlerThread 都会生成一个 ClientThread 并将其添加到列表中。

作为一个虚拟示例,主线程每秒递增一个整数,并通过 ClientHandlerThread.push_item() 将其推送到所有 ClientThread 队列。

每增加 10 个,打印客户端队列中的项目数。


现在回答我的问题:

当客户端断开连接时,Thread 终止并且不再发送数据,但是,ClientThread 对象保留在客户端的 ClientHandlerThreads 列表中,并且项目不断被推送到其队列中。

因此,我正在寻找 (1) 每当客户端断开连接时从列表中删除 ClientThread 对象的方法,(2) 比列表更好的监视 ClientThreads 的方法,或 (3) 不同的 (更好)架构来实现我的目标。

非常感谢!


服务器

import socket
import time

from threading import Thread
from queue import Queue


class ClientThread(Thread):

    def __init__(self, conn, client_addr):
        Thread.__init__(self)

        self.queue = Queue()

        self.conn = conn
        self.client_addr = client_addr

    def run(self):
        print('Client connected')

        while True:
            try:
                self.conn.sendall(self.queue.get().encode('utf-8'))
                time.sleep(1)
            except BrokenPipeError:
                print('Client disconnected')
                break


class ClientHandlerThread(Thread):

    def __init__(self):
        Thread.__init__(self, daemon = True)

        self.clients = list()

    def push_item(self, item):
        for client in self.clients:
            client.queue.put(str(i))

    def run(self):

        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:

            s.bind('./socket')
            s.listen()

            i = 1

            while True:
                conn, client_addr = s.accept()

                client = ClientThread(conn, client_addr)
                client.start()
                self.clients.append(client)

                i += 1


if __name__ == '__main__':

    client_handler = ClientHandlerThread()
    client_handler.start()

    i = 1

    while True:
        client_handler.push_item(str(i))

        if i % 10 == 0:
            print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')


        i += 1
        time.sleep(1)

客户:

import socket

if __name__ == '__main__':
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
        s.connect('./socket')

        print('Connected to server')

        while True:
            data = s.recv(1024)

            if not data:
                print('Disconnected from server')
                break

            print(data.decode('utf-8'))

注意 您可能应该阅读 aiohttp 之类的内容以获得更具可扩展性的答案版本。


对于你的问题,你可以做一些修改来实现:

首先,更改ClientThread的构造函数:

class ClientThread(Thread):

    def __init__(self, client_handler, conn, client_addr):
        self.client_handler = client_handler
        self.running = True
        ...

当处理程序创建对象时,它应该将 self 作为 client_handler 传递给它。

run method中,使用

   def run(self):
        while True:
            ...

        self.running = False
        self.client_handler.purge() 

也就是标记自己不是运行,调用handler的purge方法。这可以写成

class ClientHandlerThread(Thread):
    ...
    def purge(self):
        self.clients = [c for c in self.clients if c.running]