如何让套接字的 .recv() 不被阻塞?
How can I get a socket's .recv() not to block?
我正在尝试编写一个简单的守护程序来侦听 Unix 套接字上的命令。以下有效,但 connection.recv(1024)
行阻塞,这意味着我无法正常终止服务器:
import socket, os
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind("/tmp/sock")
server.listen()
connection, __ = server.accept()
with connection:
while True:
data = connection.recv(1024)
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
理想情况下,我想将所有这些都放在一个 Thread
中,该 Thread
每 self.LOOP_TIME
秒检查一次 self.should_stop
属性,如果设置了该值到 True
,然后退出。但是,由于 .recv()
行块,我的程序除了在任何给定时间等待之外别无他法。
当然有一个正确的方法来做到这一点,但由于我是套接字的新手,我不知道那是什么。
编辑
Jeremy Friesner 的回答让我走上了正确的道路。我意识到我可以允许线程阻塞并简单地设置 .should_stop
然后将 b""
传递给套接字以便它解除阻塞,看到它应该停止,然后干净地退出。这是最终结果:
import os
import socket
from pathlib import Path
from shutil import rmtree
from threading import Thread
class MyThreadThing(Thread):
RUNTIME_DIR = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "my-project-name"
def __init__(self):
super().__init__(daemon=True)
self.should_stop = False
if self.RUNTIME_DIR.exists():
rmtree(self.RUNTIME_DIR)
self.RUNTIME_DIR.mkdir(0o700)
self.socket_path = self.RUNTIME_DIR / "my-project.sock"
def run(self) -> None:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind(self.socket_path.as_posix())
s.listen()
while True:
connection, __ = s.accept()
action = ""
with connection:
while True:
received = connection.recv(1024).decode()
action += received
if not received:
break
# Handle whatever is in `action`
if self.should_stop:
break
self.socket_path.unlink()
def stop(self):
"""
Trigger this when you want to stop the listener.
"""
self.should_stop = True
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(self.socket_path.as_posix())
s.send(b"")
使用与您的 LOOP_TIME
相同的超时,如下所示:
import socket, os
LOOP_TIME = 10
should_stop = False
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind("/tmp/sock")
server.listen()
connection, __ = server.accept()
connection.settimeout(LOOP_TIME)
with connection:
while not should_stop:
try:
data = connection.recv(1024)
except socket.timeout:
continue
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
你可以使用select,但如果只是一个简单的socket,这种方式就没那么复杂了。
您可以选择将其放置在带有 self.should_stop
的不同线程中或仅放置在主线程中 - 它现在将监听 KeyboardInterrupt。
使用任意长度的超时总是有点不尽如人意——要么你将超时值设置为相对较长的时间,在这种情况下你的程序对退出请求的反应会变慢,因为它在毫无意义地等待超时期限到期...或者您将超时值设置为相对较短的时间,在这种情况下,您的程序会不断醒来以查看是否应该退出,浪费 CPU 24/7 全天候检查一个可能永远不会到来的事件。
处理该问题的一种更优雅的方法是创建一个管道,并在您希望事件循环退出时在管道上发送一个字节。您的事件循环可以通过 select()
同时“监视”管道的读取端文件描述符和您的网络套接字,并且当该文件描述符指示它已准备好读取时,您的事件循环可以通过退出来响应。这种方法完全是事件驱动的,因此不需要 CPU 唤醒,除非确实有事情要做。
下面是您的程序的示例版本,它实现了 SIGINT 的信号处理程序(也就是按 Control-C)以在管道上发送 please-quit-now 字节:
import socket, os
import select
import signal, sys
# Any bytes written to (writePipeFD) will become available for reading on (readPipeFD)
readPipeFD, writePipeFD = os.pipe()
# Set up a signal-handler to handle SIGINT (aka Ctrl+C) events by writing a byte to the pipe
def signal_handler(sig, frame):
print("signal_handler() is executing -- SIGINT detected!")
os.write(writePipeFD, b"[=10=]") # doesn't matter what we write; a single 0-byte will do
signal.signal(signal.SIGINT, signal_handler)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as serverSock:
serverSock.bind("/tmp/sock")
serverSock.listen()
# Wait for incoming connection (or the please-quit signal, whichever comes first)
connection = None
while True:
readReady,writeReady,exceptReady = select.select([readPipeFD,serverSock], [], [])
if (readPipeFD in readReady):
print("accept-loop: Someone wrote a byte to the pipe; time to go away!");
break
if (connection in readReady):
connection, __ = serverSock.accept()
break
# Read data from incoming connection (or the please-quit signal, whichever comes first)
if connection:
with connection:
while True:
readReady,writeReady,exceptReady = select.select([readPipeFD,connection], [], [])
if (readPipeFD in readReady):
print("Connection-loop: Someone wrote a byte to the pipe; time to go away!");
break
if (connection in readReady):
data = connection.recv(1024)
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
print("Bye!")
我正在尝试编写一个简单的守护程序来侦听 Unix 套接字上的命令。以下有效,但 connection.recv(1024)
行阻塞,这意味着我无法正常终止服务器:
import socket, os
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind("/tmp/sock")
server.listen()
connection, __ = server.accept()
with connection:
while True:
data = connection.recv(1024)
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
理想情况下,我想将所有这些都放在一个 Thread
中,该 Thread
每 self.LOOP_TIME
秒检查一次 self.should_stop
属性,如果设置了该值到 True
,然后退出。但是,由于 .recv()
行块,我的程序除了在任何给定时间等待之外别无他法。
当然有一个正确的方法来做到这一点,但由于我是套接字的新手,我不知道那是什么。
编辑
Jeremy Friesner 的回答让我走上了正确的道路。我意识到我可以允许线程阻塞并简单地设置 .should_stop
然后将 b""
传递给套接字以便它解除阻塞,看到它应该停止,然后干净地退出。这是最终结果:
import os
import socket
from pathlib import Path
from shutil import rmtree
from threading import Thread
class MyThreadThing(Thread):
RUNTIME_DIR = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "my-project-name"
def __init__(self):
super().__init__(daemon=True)
self.should_stop = False
if self.RUNTIME_DIR.exists():
rmtree(self.RUNTIME_DIR)
self.RUNTIME_DIR.mkdir(0o700)
self.socket_path = self.RUNTIME_DIR / "my-project.sock"
def run(self) -> None:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind(self.socket_path.as_posix())
s.listen()
while True:
connection, __ = s.accept()
action = ""
with connection:
while True:
received = connection.recv(1024).decode()
action += received
if not received:
break
# Handle whatever is in `action`
if self.should_stop:
break
self.socket_path.unlink()
def stop(self):
"""
Trigger this when you want to stop the listener.
"""
self.should_stop = True
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(self.socket_path.as_posix())
s.send(b"")
使用与您的 LOOP_TIME
相同的超时,如下所示:
import socket, os
LOOP_TIME = 10
should_stop = False
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind("/tmp/sock")
server.listen()
connection, __ = server.accept()
connection.settimeout(LOOP_TIME)
with connection:
while not should_stop:
try:
data = connection.recv(1024)
except socket.timeout:
continue
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
你可以使用select,但如果只是一个简单的socket,这种方式就没那么复杂了。
您可以选择将其放置在带有 self.should_stop
的不同线程中或仅放置在主线程中 - 它现在将监听 KeyboardInterrupt。
使用任意长度的超时总是有点不尽如人意——要么你将超时值设置为相对较长的时间,在这种情况下你的程序对退出请求的反应会变慢,因为它在毫无意义地等待超时期限到期...或者您将超时值设置为相对较短的时间,在这种情况下,您的程序会不断醒来以查看是否应该退出,浪费 CPU 24/7 全天候检查一个可能永远不会到来的事件。
处理该问题的一种更优雅的方法是创建一个管道,并在您希望事件循环退出时在管道上发送一个字节。您的事件循环可以通过 select()
同时“监视”管道的读取端文件描述符和您的网络套接字,并且当该文件描述符指示它已准备好读取时,您的事件循环可以通过退出来响应。这种方法完全是事件驱动的,因此不需要 CPU 唤醒,除非确实有事情要做。
下面是您的程序的示例版本,它实现了 SIGINT 的信号处理程序(也就是按 Control-C)以在管道上发送 please-quit-now 字节:
import socket, os
import select
import signal, sys
# Any bytes written to (writePipeFD) will become available for reading on (readPipeFD)
readPipeFD, writePipeFD = os.pipe()
# Set up a signal-handler to handle SIGINT (aka Ctrl+C) events by writing a byte to the pipe
def signal_handler(sig, frame):
print("signal_handler() is executing -- SIGINT detected!")
os.write(writePipeFD, b"[=10=]") # doesn't matter what we write; a single 0-byte will do
signal.signal(signal.SIGINT, signal_handler)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as serverSock:
serverSock.bind("/tmp/sock")
serverSock.listen()
# Wait for incoming connection (or the please-quit signal, whichever comes first)
connection = None
while True:
readReady,writeReady,exceptReady = select.select([readPipeFD,serverSock], [], [])
if (readPipeFD in readReady):
print("accept-loop: Someone wrote a byte to the pipe; time to go away!");
break
if (connection in readReady):
connection, __ = serverSock.accept()
break
# Read data from incoming connection (or the please-quit signal, whichever comes first)
if connection:
with connection:
while True:
readReady,writeReady,exceptReady = select.select([readPipeFD,connection], [], [])
if (readPipeFD in readReady):
print("Connection-loop: Someone wrote a byte to the pipe; time to go away!");
break
if (connection in readReady):
data = connection.recv(1024)
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
print("Bye!")