如何防止我的服务器脚本中的管道损坏错误

How to prevent Brokenpipe error in my server script

这是我的服务器脚本,用于接收和向所有客户端发送消息

import socket
import threading

HEADER = 64
PORT = 5050
SERVER = socket.gethostbyname(socket.gethostname())
ADDR = (SERVER, PORT)
FORMAT = 'utf-8'
DISCONNECT_MESSAGE = "!DISCONNECT"

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(ADDR)

clients = set()
clients_lock = threading.Lock()

def handle_client(conn, addr):
    name = conn.recv(HEADER).decode(FORMAT)
    if name:
        name = int(name)
        msg_name = conn.recv(name).decode(FORMAT)
    print(f"[NEW CONNECTION] {msg_name} connected.")
    connection_message = f"{msg_name} connected."
    with clients_lock:
        for c in clients:
            if c != conn:
                message = connection_message.encode(FORMAT)
                msg_length = len(message)
                send_length = str(msg_length).encode(FORMAT)
                send_length += b' ' * (HEADER - len(send_length))
                c.sendall(send_length)
                c.sendall(message)

    with clients_lock:
        clients.add(conn)

    connected = True
    try:
        while connected:
            msg_length = conn.recv(HEADER).decode(FORMAT)
            if msg_length:
                msg_length = int(msg_length)
                msg1 = conn.recv(msg_length).decode(FORMAT)
                msg = f"{msg_name}: {msg1}"
                if msg1 == DISCONNECT_MESSAGE:
                    connected = False
                print(f"{msg}")
                with clients_lock:
                    for c in clients:
                        if c != conn:
                            message = msg.encode(FORMAT)
                            msg_length = len(message)
                            send_length = str(msg_length).encode(FORMAT)
                            send_length += b' ' * (HEADER - len(send_length))
                            c.sendall(send_length)
                            c.sendall(message)
                msg = f"You: {msg1}"
                message = msg.encode(FORMAT)
                msg_length = len(message)
                send_length = str(msg_length).encode(FORMAT)
                send_length += b' ' * (HEADER - len(send_length))
                conn.send(send_length)
                conn.send(message)

    finally:
        with clients_lock:
            clients.remove(conn)
            conn.close()

def start():
    server.listen()
    print(f"[LISTENING] Server is listening on {SERVER}")
    while True:
        conn, addr = server.accept()
        thread = threading.Thread(target=handle_client, args=(conn, addr))
        thread.daemon = True
        thread.start()
        print(f"[ACTIVE CONNECTIONS] {threading.activeCount() - 1}")

print("[STARTING] server is starting...")
start()

问题是,如果服务器在未发送 DISCONNECT_MESSAGE 的情况下断开连接,即我在程序完成之前强行关闭它,我会收到 Brokenpipe 错误。

这是客户端脚本:

import socket
import threading
import tkinter as tk

def returnname():
    def receiving():
        receiving = True
        while receiving:
            msg_length = client.recv(HEADER).decode(FORMAT)
            if msg_length:
                msg_length = int(msg_length)
                msg = client.recv(msg_length).decode(FORMAT)
                TEXTAREA.insert("end", msg)
                TEXTAREA.see("end")

    def send(msg):
        message = msg.encode(FORMAT)
        msg_length = len(message)
        send_length = str(msg_length).encode(FORMAT)
        send_length += b' ' * (HEADER - len(send_length))
        client.send(send_length)
        client.send(message)

    def sendmessage():
        mess = MESSAGEFIELD.get()
        MESSAGEFIELD.delete(0, "end")
        send(mess)

    def quitmessage():
        send(DISCONNECT_MESSAGE)
        exit()

    name = FIELD.get()
    FIELD.pack_forget()
    BUTTON.pack_forget()
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(ADDR)
    message = name.encode(FORMAT)
    msg_length = len(message)
    send_length = str(msg_length).encode(FORMAT)
    send_length += b' ' * (HEADER - len(send_length))
    client.send(send_length)
    client.send(message)
    thread = threading.Thread(target=receiving)
    thread.daemon = True
    thread.start()
    MESSAGEFIELD = tk.Entry(TOP)
    SENDBUTTON = tk.Button(TOP, text="Send", command=sendmessage)
    QUITBUTTON = tk.Button(TOP, text="Quit", command=quitmessage)
    TEXTAREA = tk.Listbox(TOP)
    SCROLLBAR = tk.Scrollbar(TOP)
    MESSAGEFIELD.pack()
    SENDBUTTON.pack()
    QUITBUTTON.pack()
    TEXTAREA.pack(side="left", expand=True, fill="both")
    SCROLLBAR.pack(side="right", fill="both")
    SCROLLBAR.config(command=TEXTAREA.yview)

HEADER = 64
PORT = 5050
FORMAT = 'utf-8'
SERVER = "IP ADDRESS OF SERVER"
ADDR = (SERVER, PORT)
DISCONNECT_MESSAGE = "!DISCONNECT"

TOP = tk.Tk()
FIELD = tk.Entry(TOP)
FIELD.insert(0, "Enter Name Here")
BUTTON = tk.Button(TOP, text="Send", command=returnname)
FIELD.pack(expand=True)
BUTTON.pack(expand=True)
TOP.mainloop()

谁能告诉我是否可以解决这个问题?

抱歉,如果我对问题的解释有误。我不善于解释。

为了完成这项工作,我必须做很多事情。

首先,Broken pipe 可能出现在服务器代码中的两个不同位置。第一个是当一个新客户端连接并且服务器尝试向所有客户端发送消息 New Client Connected 时,第二个是当现有客户端发送消息时。所以,我们需要在这两个地方处理异常。

因此,我们将 try/except 放在两个块上。它说 if c != conn.

现在,关于如何处理异常。

正如我首先想到的,只需从客户端列表中删除客户端 c 就可以了,但是当我们尝试在迭代期间修改设置的客户端时,客户端中的 for c 循环会抛出运行时错误。

我尝试了不同的方法来克服这个问题,但这是我得到的最有效的工作方法。

我将客户端从 set() 更改为空列表 [ ]

然后我把clients.add改成了clients.append

然后我将 for 循环更改为 for c in range(len(clients)) 并使用 clients[c] 访问客户端。

但是当我尝试这样做时,我看到如果程序在删除后尝试遍历不存在的客户端,则 if 语句 if clients[c] != conn 可能会抛出列表索引越界错误。所以我也把它放在 try/except 块中,让程序在异常时继续。

for c in range(len(clients)):
        try:
            if clients[c] != conn:
                try:
                    message = connection_message.encode(FORMAT)
                    msg_length = len(message)
                    send_length = str(msg_length).encode(FORMAT)
                    send_length += b' ' * (HEADER - len(send_length))
                    clients[c].sendall(send_length)
                    clients[c].sendall(message)
                except:
                    clients.remove(clients[c])
        except:
            continue

最后一个问题是,即使在删除客户端后,线程仍然存在,因此活动线程数 returns 多于连接的客户端数。因此,我没有将活动连接数打印为活动线程数 - 1,而是打印 len(clients) + 1, + 1 因为在连接新客户端时,它会在将客户端附加到列表之前打印此行。

print(f"[ACTIVE CONNECTIONS] {len(clients) + 1}")

所以,现在整个程序是:

import socket
import threading

HEADER = 64
PORT = 5050
SERVER = socket.gethostbyname(socket.gethostname())
ADDR = (SERVER, PORT)
FORMAT = 'utf-8'
DISCONNECT_MESSAGE = "!DISCONNECT"

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(ADDR)

clients = []
clients_lock = threading.Lock()

def handle_client(conn, addr):
    name = conn.recv(HEADER).decode(FORMAT)
    if name:
        name = int(name)
        msg_name = conn.recv(name).decode(FORMAT)
    print(f"[NEW CONNECTION] {msg_name} connected.")
    connection_message = f"{msg_name} connected."
    with clients_lock:
        for c in range(len(clients)):
            try:
                if clients[c] != conn:
                    try:
                        message = connection_message.encode(FORMAT)
                        msg_length = len(message)
                        send_length = str(msg_length).encode(FORMAT)
                        send_length += b' ' * (HEADER - len(send_length))
                        clients[c].sendall(send_length)
                        clients[c].sendall(message)
                    except:
                        clients.remove(clients[c])

            except:
                continue

    with clients_lock:
        clients.append(conn)

    connected = True
    try:
        while connected:
            msg_length = conn.recv(HEADER).decode(FORMAT)
            if msg_length:
                msg_length = int(msg_length)
                msg1 = conn.recv(msg_length).decode(FORMAT)
                msg = f"{msg_name}: {msg1}"
                if msg1 == DISCONNECT_MESSAGE:
                    connected = False
                print(f"{msg}")
                with clients_lock:
                    for c in range(len(clients)):
                        try:
                            if clients[c] != conn:
                                try:
                                    message = msg.encode(FORMAT)
                                    msg_length = len(message)
                                    send_length = str(msg_length).encode(FORMAT)
                                    send_length += b' ' * (HEADER - len(send_length))
                                    clients[c].sendall(send_length)
                                    clients[c].sendall(message)
                                except:
                                    clients.remove(clients[c])
                        except:
                            continue
                msg = f"You: {msg1}"
                message = msg.encode(FORMAT)
                msg_length = len(message)
                send_length = str(msg_length).encode(FORMAT)
                send_length += b' ' * (HEADER - len(send_length))
                conn.send(send_length)
                conn.send(message)

    finally:
        with clients_lock:
            clients.remove(conn)
            conn.close()

def start():
    server.listen()
    print(f"[LISTENING] Server is listening on {SERVER}")
    while True:
        conn, addr = server.accept()
        thread = threading.Thread(target=handle_client, args=(conn, addr))
        thread.daemon = True
        thread.start()
        print(f"[ACTIVE CONNECTIONS] {len(clients) + 1}")

print("[STARTING] server is starting...")
start()