如何让套接字的 .recv() 不被阻塞?

How can I get a socket's .recv() not to block?

我正在尝试编写一个简单的守护程序来侦听 Unix 套接字上的命令。以下有效,但 connection.recv(1024) 行阻塞,这意味着我无法正常终止服务器:

import socket, os

with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
    server.bind("/tmp/sock")
    server.listen()
    connection, __ = server.accept()
    with connection:
        while True:
            data = connection.recv(1024)
            print("Hi!")  # This line isn't executed 'til data is sent
            if data:
                print(data.decode())

理想情况下,我想将所有这些都放在一个 Thread 中,该 Threadself.LOOP_TIME 秒检查一次 self.should_stop 属性,如果设置了该值到 True,然后退出。但是,由于 .recv() 行块,我的程序除了在任何给定时间等待之外别无他法。

当然有一个正确的方法来做到这一点,但由于我是套接字的新手,我不知道那是什么。

编辑

Jeremy Friesner 的回答让我走上了正确的道路。我意识到我可以允许线程阻塞并简单地设置 .should_stop 然后将 b"" 传递给套接字以便它解除阻塞,看到它应该停止,然后干净地退出。这是最终结果:

import os
import socket

from pathlib import Path
from shutil import rmtree
from threading import Thread


class MyThreadThing(Thread):

    RUNTIME_DIR = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "my-project-name"

    def __init__(self):

        super().__init__(daemon=True)

        self.should_stop = False

        if self.RUNTIME_DIR.exists():
            rmtree(self.RUNTIME_DIR)
        self.RUNTIME_DIR.mkdir(0o700)

        self.socket_path = self.RUNTIME_DIR / "my-project.sock"

    def run(self) -> None:

        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:

            s.bind(self.socket_path.as_posix())
            s.listen()

            while True:

                connection, __ = s.accept()
                action = ""

                with connection:

                    while True:

                        received = connection.recv(1024).decode()
                        action += received
                        if not received:
                            break

                    # Handle whatever is in `action`

                if self.should_stop:
                    break

        self.socket_path.unlink()

    def stop(self):
        """
        Trigger this when you want to stop the listener.
        """
        self.should_stop = True
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
            s.connect(self.socket_path.as_posix())
            s.send(b"")

使用与您的 LOOP_TIME 相同的超时,如下所示:

import socket, os

LOOP_TIME = 10
should_stop = False

with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
    server.bind("/tmp/sock")
    server.listen()
    connection, __ = server.accept()
    connection.settimeout(LOOP_TIME)
    with connection:
        while not should_stop:
            try:
                data = connection.recv(1024)
            except socket.timeout:
                continue
            print("Hi!")  # This line isn't executed 'til data is sent
            if data:
                print(data.decode())

你可以使用select,但如果只是一个简单的socket,这种方式就没那么复杂了。

您可以选择将其放置在带有 self.should_stop 的不同线程中或仅放置在主线程中 - 它现在将监听 KeyboardInterrupt。

使用任意长度的超时总是有点不尽如人意——要么你将超时值设置为相对较长的时间,在这种情况下你的程序对退出请求的反应会变慢,因为它在毫无意义地等待超时期限到期...或者您将超时值设置为相对较短的时间,在这种情况下,您的程序会不断醒来以查看是否应该退出,浪费 CPU 24/7 全天候检查一个可能永远不会到来的事件。

处理该问题的一种更优雅的方法是创建一个管道,并在您希望事件循环退出时在管道上发送一个字节。您的事件循环可以通过 select() 同时“监视”管道的读取端文件描述符和您的网络套接字,并且当该文件描述符指示它已准备好读取时,您的事件循环可以通过退出来响应。这种方法完全是事件驱动的,因此不需要 CPU 唤醒,除非确实有事情要做。

下面是您的程序的示例版本,它实现了 SIGINT 的信号处理程序(也就是按 Control-C)以在管道上发送 please-quit-now 字节:

import socket, os
import select
import signal, sys

# Any bytes written to (writePipeFD) will become available for reading on (readPipeFD)
readPipeFD, writePipeFD = os.pipe()

# Set up a signal-handler to handle SIGINT (aka Ctrl+C) events by writing a byte to the pipe
def signal_handler(sig, frame):
    print("signal_handler() is executing -- SIGINT detected!")
    os.write(writePipeFD, b"[=10=]")  # doesn't matter what we write; a single 0-byte will do
signal.signal(signal.SIGINT, signal_handler)

with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as serverSock:
    serverSock.bind("/tmp/sock")
    serverSock.listen()

    # Wait for incoming connection (or the please-quit signal, whichever comes first)
    connection = None
    while True:
       readReady,writeReady,exceptReady = select.select([readPipeFD,serverSock], [], [])
       if (readPipeFD in readReady):
          print("accept-loop: Someone wrote a byte to the pipe; time to go away!");
          break
       if (connection in readReady):
          connection, __ = serverSock.accept()
          break

    # Read data from incoming connection (or the please-quit signal, whichever comes first)
    if connection:
       with connection:
          while True:
             readReady,writeReady,exceptReady = select.select([readPipeFD,connection], [], [])
             if (readPipeFD in readReady):
                print("Connection-loop: Someone wrote a byte to the pipe; time to go away!");
                break
             if (connection in readReady):
                data = connection.recv(1024)
                print("Hi!")  # This line isn't executed 'til data is sent
                if data:
                   print(data.decode())

    print("Bye!")