多线程 TCP 套接字

Multithreaded TCP socket

我正在尝试创建一个可以同时处理多个套接字请求的线程化 TCP 套接字服务器。

为了测试它,我在客户端启动了几个线程以查看我的服务器是否可以处理它。第一个套接字打印成功,但其他套接字打印为 [Errno 32] Broken pipe。 我不知道如何避免它。

import threading
import socketserver
import graphitesend


class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = self.request.recv(1024)
        if data != "":
            print(data)

class ThreadedTCPServer(socketserver.ThreadingTCPServer):
    allow_reuse_address = True

    def __init__(self, host, port):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), ThreadedTCPRequestHandler)

    def stop(self):
        self.server_close()
        self.shutdown()

    def start(self):
        threading.Thread(target=self._on_started).start()

    def _on_started(self):
        self.serve_forever()

def client(g):
    g.send("test", 1)

if __name__ == "__main__":
    HOST, PORT = "localhost", 2003
    server = ThreadedTCPServer(HOST, PORT)
    server.start()
    g = graphitesend.init(graphite_server = HOST, graphite_port = PORT)
    threading.Thread(target = client, args=(g,)).start()
    threading.Thread(target = client, args=(g,)).start()
    threading.Thread(target = client, args=(g,)).start()
    threading.Thread(target = client, args=(g,)).start()
    threading.Thread(target = client, args=(g,)).start()
    threading.Thread(target = client, args=(g,)).start()
    threading.Thread(target = client, args=(g,)).start()
    server.stop()

要确定您到底期望发生什么有点困难,但我认为最直接的原因是您没有给您的客户时间 运行 在终止服务器之前。

当您构造一个 Thread 对象并调用其 start 方法时,您正在创建一个线程,并使其准备好 运行。然后它将被放置在您系统上的 "runnable" 任务队列中,但它将与您的主线程和所有其他线程(实际上是同一台机器上的所有其他任务)竞争 CPU 时间.

你的多个线程(主线程加上其他线程)也可能被 python 解释器的 GIL 序列化(Global Interpreter Lock——假设你使用的是 "standard" CPython)这意味着他们可能还没有 "out of the gate"。

但是在他们有机会发送任何东西之前,您就用 server_close() 关闭了服务器。这与 "Broken Pipe" 错误一致:您剩余的客户端正在尝试写入已被 "remote" 端关闭的套接字。

您应该在创建线程对象时收集它们并将它们放在一个列表中(以便您以后可以引用它们)。当您完成所有这些的创建和启动后,返回列表并在每个线程对象上调用 .join 方法。这将确保线程有机会完成。只有这样你才应该关闭服务器。像这样:

threads = []
for n in range(7):
    th = threading.Thread(target=client, args=(g,))
    th.start()
    threads.append(th)

# All threads created. Wait for them to finish.
for th in threads:
    th.join()

server.stop()

另一件需要注意的事情是,您的所有客户端都共享同一个连接以发送到服务器,因此您的服务器永远不会创建多个线程:就其而言,只有一个单一客户。如果您确实想要为每个客户端单独连接,您可能应该将 graphitesend.init 移动到客户端函数中。

(免责声明:我对 graphitesend 一无所知,除了我可以在 15 秒内看到 google 的第一个结果;我假设它基本上只是 TCP 的包装器连接。)