如何使用非阻塞套接字
How to use Non-Blocking sockets
我正在尝试编写非阻塞 server/client 脚本。
首先,
这是我的代码:
Server.py ->
import socket
import select
import threading
class ChatServer(threading.Thread):
"""
SERVER THREAD
"""
MAX_WAITING_CONNECTION = 10
RECV_HEADER_LENGTH = 10
def __init__(self, host, port):
"""
Initialize new ChatServer
:param host: Binding Host IP
:param port: Binding Port number
"""
threading.Thread.__init__(self)
self.host = host
self.port = port
self.connections = [] ## Will keep active client connections.
self.clients = {}
self.running = True
def _bind_socket(self):
"""
Creates the server socket and binds it to the given host and port
"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(self.MAX_WAITING_CONNECTION)
self.connections.append(self.server_socket)
def _send(self, sock, client_message):
"""
Prefixes each message with a 4-byte length before sending.
:param sock: the incoming sock
:param msg: the massage to send
"""
user = self.clients[sock]
client_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
sock.send(client_message)
def _receive(self, sock):
try:
## Bytes type header
message_header = sock.recv(self.RECV_HEADER_LENGTH)
if not len(message_header):
return False
message_length = int(message_header.decode('utf-8').strip())
## Bytes type data
return {"header": message_header, "data": sock.recv(message_length)}
except Exception as e:
print('exception occur')
print(e)
return False
def _broadcast(self, sending_client_socket, client_message):
"""
Breadcasts a message to all the clients different from both the server itself and
the client sending the message.
:param client_socket: the socket of the client sending the message
:param client_message: the message to broadcast ({'header': <bytes header>, 'data': <bytes data message>})
"""
for sock in self.clients:
is_not_the_server = sock != self.server_socket
is_not_the_client_sending = sock != sending_client_socket ## sending client socket
if is_not_the_server and is_not_the_client_sending:
try:
user = self.clients[sending_client_socket]
print(f"Type client_message: {type(client_message)}")
print(f"Type user: {type(user)}")
sending_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
sock.send(sending_message)
except socket.error:
## handles a possible disconnection of client "sock" by ..
sock.close()
self.connections.remove(sock) ## removing sock form active connections.
del self.clients[sock]
def _log(self, sock, message):
user = self.clients[sock]
print(f"Received message from {user['data'].decode('utf-8')}: {message['data'].decode('utf-8')}")
def _run(self):
"""
Actually runs the server.
"""
while self.running:
## Get the list of sockets which are ready to be read through select non-blocking calls
## The select has a timeout of 60 seconds
try:
ready_to_read, ready_to_write, in_error = select.select(self.connections, [], self.connections)
except socket.error as e:
print(f"General Error: {e}")
continue
else:
for sock in ready_to_read:
## if socket is server socket.
if sock == self.server_socket:
try:
client_socket, client_address = self.server_socket.accept()
except socket.error as e:
print(f"General Error: {e}")
break
else:
user = self._receive(client_socket)
if user is False:
continue
self.connections.append(client_socket)
self.clients[client_socket] = user
print(f"Accepted new connection from {client_address[0]}:{client_address[1]}..")
else:
message = self._receive(sock) ## Get client message
if message is False:
print(f"Closed connection from {self.clients[sock]['data'].decode('utf-8')}")
self.connections.remove(sock)
del self.clients[sock]
continue
self._log(sock, message)
print(message)
self._broadcast(sock, message)
for sock in in_error:
self.connections.remove(sock)
del self.clients[sock]
self.stop()
def run(self):
"""
Given a host and a port, binds the socket and runs the server.
"""
self._bind_socket()
self._run()
def stop():
"""
Stops the server by setting the "running" flag before closing
the socket connection.
"""
self.running = False
self.server_socket.close()
if __name__ == '__main__':
_HOST = '127.0.0.1'
_PORT = 6667
chat_server = ChatServer(_HOST, _PORT)
chat_server.start()
chat_server.join()
还有我的 client.py ->
import socket
import select
import errno
import threading
import sys
RECV_HEADER_LENGTH = 10
class ChatClient(threading.Thread):
def __init__(self, host, port):
"""
Initialize new ChatClient
:param host: Connect Host IP
:param port: Connect Port number
"""
threading.Thread.__init__(self)
self.host = host
self.port = port
self.username = input("Username: ")
self.running = True
def _send(self, sock, message):
sock.send(message.encode('utf-8'))
def _connect(self):
"""
Connecting to the ChatServer
"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
self.client_socket.setblocking(0)
self.username_header = f"{len(self.username):<{RECV_HEADER_LENGTH}}"
self._send(self.client_socket, self.username_header+self.username)
def prompt(self) :
sys.stdout.write(f"#{self.username}$ ")
sys.stdout.flush()
def _run(self):
"""
Actually run client.
"""
while self.running:
reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket], [self.client_socket], [])
for sock in reading_sockets:
if sock == self.client_socket:
username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
if not len(username_header):
print("Connection closed by the server.")
sys.exit()
username_length = int(username_header.decode("utf-8").strip())
username = self.client_socket.recv(username_length).decode("utf-8")
message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
message_length = int(message_header.decode('utf-8').strip())
message = self.client_socket.recv(message_length).decode('utf-8')
print(f"#{username}$ {message}")
for sock in writing_sockets:
self.prompt()
message = input()
print(len(message))
if not message:
continue
message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
self._send(sock, message_header+message)
self.stop()
def run(self):
"""
Given a host and a port, binds the socket and runs the server.
"""
self._connect()
self._run()
def stop():
"""
Stops the server by setting the "running" flag before closing
the socket connection.
"""
self.running = False
self.client_socket.close()
if __name__ == '__main__':
_HOST = '127.0.0.1'
_PORT = 6667
chat_server = ChatClient(_HOST, _PORT)
chat_server.start()
chat_server.join()
现在我的问题是 client.py 我想。在 _运行 函数中,我对同一个套接字使用 select reading_socket 和 writing_socket。
当我 运行 这段代码时,阻塞了 reading_socket 的循环。因为在 writing_sockets 的 for 循环中保持我的 shell 并且永远不会释放,即使是另一个按摩来了。
所以我想等待用户输入,但同时阅读其他消息并在 shell 上打印。
我正在使用 python3.7。
我怎样才能做到这一点?
So I want to make wait user input but at the same time read other
messages and print on shell. I am using python3.7. How can I achieve
this?
当 sys.stdin
实际上已经准备好用户输入时,确保只从 sys.stdin
读取;这样您的 input()
呼叫就不会阻塞。您可以通过将 sys.stdin
作为第一个参数中的套接字之一传递给 select()
来实现。 (注意:这在 Windows 下不起作用,因为微软明智地决定他们 select()
的实现不支持在 stdin
上选择。在 Windows 下你'将不得不使用一个单独的线程来阻塞从 stdin
的读取,以及某种线程间消息传递来将从 stdin
读取的数据返回到网络线程,这是一个开始工作的巨大痛苦)
以下是我如何修改您的 _run(self)
方法以获得您想要的行为(在 MacOS/X 下测试):
def _run(self):
"""
Actually run client.
"""
while self.running:
reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket, sys.stdin], [], [])
for sock in reading_sockets:
if sock == self.client_socket:
username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
if not len(username_header):
print("Connection closed by the server.")
sys.exit()
username_length = int(username_header.decode("utf-8").strip())
username = self.client_socket.recv(username_length).decode("utf-8")
message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
message_length = int(message_header.decode('utf-8').strip())
message = self.client_socket.recv(message_length).decode('utf-8')
print(f"#{username}$ {message}")
elif sock == sys.stdin:
self.prompt()
message = input()
print(len(message))
if not message:
continue
message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
self._send(self.client_socket, message_header+message)
self.stop()
请注意,我将 sys.stdin
添加到 select()
调用的 read-sockets 参数中(以便 select()
将 return 当它们的数据准备就绪时从 stdin
读取),并且我还从 write-sockets 参数中删除了 self.client_socket
(因为将它放在那里会导致 select()
到 return 一旦 self.client_socket
有 buffer-space 来接受更多传出数据,也就是说,几乎所有时间它都会立即 return,这会旋转你的事件循环并导致你的客户端程序使用接近 100%一个核心,这不是你想要的)。
我还修改了您的 read-from-stdin
代码,使其仅在可读套接字为 sys.stdin
时被调用,因为除非它有数据可以提供给您,否则尝试从 stdin 读取是没有意义的;最后我让你的 self._send()
调用在 TCP 套接字上发送,而不是尝试将字节发送回 stdin
(因为标准输入是 read/input-only,所以向它发送字节没有任何意义).
我正在尝试编写非阻塞 server/client 脚本。
首先, 这是我的代码:
Server.py ->
import socket
import select
import threading
class ChatServer(threading.Thread):
"""
SERVER THREAD
"""
MAX_WAITING_CONNECTION = 10
RECV_HEADER_LENGTH = 10
def __init__(self, host, port):
"""
Initialize new ChatServer
:param host: Binding Host IP
:param port: Binding Port number
"""
threading.Thread.__init__(self)
self.host = host
self.port = port
self.connections = [] ## Will keep active client connections.
self.clients = {}
self.running = True
def _bind_socket(self):
"""
Creates the server socket and binds it to the given host and port
"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(self.MAX_WAITING_CONNECTION)
self.connections.append(self.server_socket)
def _send(self, sock, client_message):
"""
Prefixes each message with a 4-byte length before sending.
:param sock: the incoming sock
:param msg: the massage to send
"""
user = self.clients[sock]
client_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
sock.send(client_message)
def _receive(self, sock):
try:
## Bytes type header
message_header = sock.recv(self.RECV_HEADER_LENGTH)
if not len(message_header):
return False
message_length = int(message_header.decode('utf-8').strip())
## Bytes type data
return {"header": message_header, "data": sock.recv(message_length)}
except Exception as e:
print('exception occur')
print(e)
return False
def _broadcast(self, sending_client_socket, client_message):
"""
Breadcasts a message to all the clients different from both the server itself and
the client sending the message.
:param client_socket: the socket of the client sending the message
:param client_message: the message to broadcast ({'header': <bytes header>, 'data': <bytes data message>})
"""
for sock in self.clients:
is_not_the_server = sock != self.server_socket
is_not_the_client_sending = sock != sending_client_socket ## sending client socket
if is_not_the_server and is_not_the_client_sending:
try:
user = self.clients[sending_client_socket]
print(f"Type client_message: {type(client_message)}")
print(f"Type user: {type(user)}")
sending_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
sock.send(sending_message)
except socket.error:
## handles a possible disconnection of client "sock" by ..
sock.close()
self.connections.remove(sock) ## removing sock form active connections.
del self.clients[sock]
def _log(self, sock, message):
user = self.clients[sock]
print(f"Received message from {user['data'].decode('utf-8')}: {message['data'].decode('utf-8')}")
def _run(self):
"""
Actually runs the server.
"""
while self.running:
## Get the list of sockets which are ready to be read through select non-blocking calls
## The select has a timeout of 60 seconds
try:
ready_to_read, ready_to_write, in_error = select.select(self.connections, [], self.connections)
except socket.error as e:
print(f"General Error: {e}")
continue
else:
for sock in ready_to_read:
## if socket is server socket.
if sock == self.server_socket:
try:
client_socket, client_address = self.server_socket.accept()
except socket.error as e:
print(f"General Error: {e}")
break
else:
user = self._receive(client_socket)
if user is False:
continue
self.connections.append(client_socket)
self.clients[client_socket] = user
print(f"Accepted new connection from {client_address[0]}:{client_address[1]}..")
else:
message = self._receive(sock) ## Get client message
if message is False:
print(f"Closed connection from {self.clients[sock]['data'].decode('utf-8')}")
self.connections.remove(sock)
del self.clients[sock]
continue
self._log(sock, message)
print(message)
self._broadcast(sock, message)
for sock in in_error:
self.connections.remove(sock)
del self.clients[sock]
self.stop()
def run(self):
"""
Given a host and a port, binds the socket and runs the server.
"""
self._bind_socket()
self._run()
def stop():
"""
Stops the server by setting the "running" flag before closing
the socket connection.
"""
self.running = False
self.server_socket.close()
if __name__ == '__main__':
_HOST = '127.0.0.1'
_PORT = 6667
chat_server = ChatServer(_HOST, _PORT)
chat_server.start()
chat_server.join()
还有我的 client.py ->
import socket
import select
import errno
import threading
import sys
RECV_HEADER_LENGTH = 10
class ChatClient(threading.Thread):
def __init__(self, host, port):
"""
Initialize new ChatClient
:param host: Connect Host IP
:param port: Connect Port number
"""
threading.Thread.__init__(self)
self.host = host
self.port = port
self.username = input("Username: ")
self.running = True
def _send(self, sock, message):
sock.send(message.encode('utf-8'))
def _connect(self):
"""
Connecting to the ChatServer
"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
self.client_socket.setblocking(0)
self.username_header = f"{len(self.username):<{RECV_HEADER_LENGTH}}"
self._send(self.client_socket, self.username_header+self.username)
def prompt(self) :
sys.stdout.write(f"#{self.username}$ ")
sys.stdout.flush()
def _run(self):
"""
Actually run client.
"""
while self.running:
reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket], [self.client_socket], [])
for sock in reading_sockets:
if sock == self.client_socket:
username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
if not len(username_header):
print("Connection closed by the server.")
sys.exit()
username_length = int(username_header.decode("utf-8").strip())
username = self.client_socket.recv(username_length).decode("utf-8")
message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
message_length = int(message_header.decode('utf-8').strip())
message = self.client_socket.recv(message_length).decode('utf-8')
print(f"#{username}$ {message}")
for sock in writing_sockets:
self.prompt()
message = input()
print(len(message))
if not message:
continue
message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
self._send(sock, message_header+message)
self.stop()
def run(self):
"""
Given a host and a port, binds the socket and runs the server.
"""
self._connect()
self._run()
def stop():
"""
Stops the server by setting the "running" flag before closing
the socket connection.
"""
self.running = False
self.client_socket.close()
if __name__ == '__main__':
_HOST = '127.0.0.1'
_PORT = 6667
chat_server = ChatClient(_HOST, _PORT)
chat_server.start()
chat_server.join()
现在我的问题是 client.py 我想。在 _运行 函数中,我对同一个套接字使用 select reading_socket 和 writing_socket。
当我 运行 这段代码时,阻塞了 reading_socket 的循环。因为在 writing_sockets 的 for 循环中保持我的 shell 并且永远不会释放,即使是另一个按摩来了。 所以我想等待用户输入,但同时阅读其他消息并在 shell 上打印。 我正在使用 python3.7。 我怎样才能做到这一点?
So I want to make wait user input but at the same time read other messages and print on shell. I am using python3.7. How can I achieve this?
当 sys.stdin
实际上已经准备好用户输入时,确保只从 sys.stdin
读取;这样您的 input()
呼叫就不会阻塞。您可以通过将 sys.stdin
作为第一个参数中的套接字之一传递给 select()
来实现。 (注意:这在 Windows 下不起作用,因为微软明智地决定他们 select()
的实现不支持在 stdin
上选择。在 Windows 下你'将不得不使用一个单独的线程来阻塞从 stdin
的读取,以及某种线程间消息传递来将从 stdin
读取的数据返回到网络线程,这是一个开始工作的巨大痛苦)
以下是我如何修改您的 _run(self)
方法以获得您想要的行为(在 MacOS/X 下测试):
def _run(self):
"""
Actually run client.
"""
while self.running:
reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket, sys.stdin], [], [])
for sock in reading_sockets:
if sock == self.client_socket:
username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
if not len(username_header):
print("Connection closed by the server.")
sys.exit()
username_length = int(username_header.decode("utf-8").strip())
username = self.client_socket.recv(username_length).decode("utf-8")
message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
message_length = int(message_header.decode('utf-8').strip())
message = self.client_socket.recv(message_length).decode('utf-8')
print(f"#{username}$ {message}")
elif sock == sys.stdin:
self.prompt()
message = input()
print(len(message))
if not message:
continue
message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
self._send(self.client_socket, message_header+message)
self.stop()
请注意,我将 sys.stdin
添加到 select()
调用的 read-sockets 参数中(以便 select()
将 return 当它们的数据准备就绪时从 stdin
读取),并且我还从 write-sockets 参数中删除了 self.client_socket
(因为将它放在那里会导致 select()
到 return 一旦 self.client_socket
有 buffer-space 来接受更多传出数据,也就是说,几乎所有时间它都会立即 return,这会旋转你的事件循环并导致你的客户端程序使用接近 100%一个核心,这不是你想要的)。
我还修改了您的 read-from-stdin
代码,使其仅在可读套接字为 sys.stdin
时被调用,因为除非它有数据可以提供给您,否则尝试从 stdin 读取是没有意义的;最后我让你的 self._send()
调用在 TCP 套接字上发送,而不是尝试将字节发送回 stdin
(因为标准输入是 read/input-only,所以向它发送字节没有任何意义).