如果 TCP 客户端能够暂停服务器,当 TCP 服务器读取非阻塞套接字时

Should a TCP client be able to pause the server, when the TCP server reads a non-blocking socket

概览

我有一个简单的问题,代码如下。希望我没有在代码中犯错。

我是一名网络工程师,我需要测试我们的业务应用程序在网络中断期间保持活动状态的某些 linux 行为(我将稍后插入一些 iptables 东西以连接连接 - 首先我想确保我的客户端和服务器正确。

作为我正在进行的网络故障测试的一部分,我编写了一个非阻塞 Python TCP 客户端和服务器,它们应该在循环中盲目地相互发送消息。为了了解发生了什么,我正在使用循环计数器。

服务器的循环应该相对简单。我循环遍历 select 表示已准备就绪的每个 fd。我什至从未在服务器代码中的任何地方导入 sleep 。从这个角度来看,我不希望服务器代码在客户端套接字上循环时暂停 ,但由于某种原因,服务器代码会间歇性暂停 (更多详细信息,下文)。

我最初并没有在客户的循环中睡觉。在客户端没有休眠的情况下,服务器和客户端似乎和我想要的一样高效。但是,当我在客户端对服务器fd.send()执行fd.send()放置time.sleep(1)语句时,TCP服务器代码间歇性 在客户睡觉时暂停。

我的问题:

  • 我是否可以编写一个单线程 Python TCP 服务器,当客户端在客户端的 fd.send() 循环中点击 time.sleep() 时不会暂停?如果是这样,我做错了什么? <- 已回答
  • 如果我正确地编写了这个测试代码并且服务器不应该暂停,为什么 TCP 服务器在轮询时 间歇性 暂停客户端的数据连接?

重现场景

我在两台 RHEL6 linux 机器上 运行 这个。要重现问题...

  • 打开两个不同的终端。
  • 将客户端和服务器脚本保存在不同的文件中
  • 将 shebang 路径更改为本地 python(我使用的是 Python 2.7.15)
  • 将客户端代码中的 SERVER_HOSTNAMESERVER_DOMAIN 更改为您所在 运行
  • 服务器的主机名和域
  • 先启动服务器,再启动客户端。

客户端连接后,您会看到在服务器终端中快速滚动显示的消息,如 EXHIBIT 1 所示。 几秒钟后 当客户端点击 time.sleep() 时,滚动会间歇性地 暂停。我不希望看到那些停顿,但也许我误解了什么。

展览 1

---
LOOP_COUNT 0
---
LOOP_COUNT 1
---
LOOP_COUNT 2
---
LOOP_COUNT 3
CLIENTMSG: 'client->server 0'
---
LOOP_COUNT 4
---
LOOP_COUNT 5
---
LOOP_COUNT 6
---
LOOP_COUNT 7
---
LOOP_COUNT 8
---
LOOP_COUNT 9
---
LOOP_COUNT 10
---
LOOP_COUNT 11
---

总结决议

If I wrote this test code correctly and the server shouldn't pause, why is the TCP server intermittently pausing while it polls the client's connection for data?

回答我自己的问题。我的阻塞问题是由使用非零超时调用 select() 引起的。

当我将 select() 更改为使用零秒超时时,我得到了预期的结果。

最终非阻塞代码(在答案中结合建议):

tcp_server.py

#!/usr/bin/python -u
from socket import AF_INET, SOCK_STREAM, SO_REUSEADDR, SOL_SOCKET
from socket import MSG_DONTWAIT
#from socket import MSG_OOB  <--- for send()
from socket import socket
import socket as socket_module
import select
import errno
import fcntl
import time
import sys
import os

def get_errno_info(e, op='', debugmsg=False):
    """Return verbose information from errno errors, such as errors returned by python socket()"""
    VALID_OP = set(['accept', 'connect', 'send', 'recv', 'read', 'write'])
    assert op.lower() in VALID_OP, "op must be: {0}".format(
        ','.join(sorted(VALID_OP)))

    ## ref: man 3 errno (in linux)... other systems may be man 2 intro
    ##   also see https://docs.python.org/2/library/errno.html
    try:
        retval_int = int(e.args[0])         # Example: 32
        retval_str = os.strerror(e.args[0]) # Example: 'Broken pipe'
        retval_code = errno.errorcode.get(retval_int, 'MODULEFAIL') # Ex: EPIPE
    except:
        ## I don't expect to get here unless something broke in python errno...
        retval_int  = -1
        retval_str  = '__somethingswrong__'
        retval_code = 'BADFAIL'

    if debugmsg:
        print "DEBUG: Can't {0}() on socket (errno:{1}, code:{2} / {3})".format(
            op, retval_int, retval_code, retval_str)
    return retval_int, retval_str, retval_code


host = ''
port = 6667     # IRC service
DEBUG = True

serv_sock = socket(AF_INET, SOCK_STREAM)
serv_sock.setsockopt(SOL_SOCKET, SOCK_STREAM, 1)
serv_sock.bind((host, port))
serv_sock.listen(5)

#fcntl.fcntl(serv_sock, fcntl.F_SETFL, os.O_NONBLOCK)  # Make the socket non-blocking
serv_sock.setblocking(False)

sock_list = [serv_sock]

from_client_str = '__DEFAULT__'

to_client_idx = 0
loop_count = 0
need_send_select = False
while True:
    if need_send_select:
        # Only do this after send() EAGAIN or EWOULDBLOCK...
        send_sock_list = sock_list
    else:
        send_sock_list = []

    #print "---"
    #print "LOOP_COUNT",  loop_count

    recv_ready_list, send_ready_list, exception_ready = select.select(
        sock_list, send_sock_list, [], 0.0)  # Last float is the select() timeout...


    ## Read all sockets which are output-ready... might be client or server...
    for sock_fd in recv_ready_list:

        # accept() if we're reading on the server socket...
        if sock_fd is serv_sock:
            try:
                clientsock, clientaddr = sock_fd.accept()
            except socket_module.error, e:
                errstr, errint, errcode = get_errno_info(e, op='accept',
                    debugmsg=DEBUG)

            assert sock_fd.gettimeout()==0.0, "client socket should be in non-blocking mode"
            sock_list.append(clientsock)

        # read input from the client socket...
        else:
            try:
                from_client_str = sock_fd.recv(1024, MSG_DONTWAIT)
                if from_client_str=='':
                    # Client closed the socket...
                    print "CLIENT CLOSED SOCKET"
                    sock_list.remove(sock_fd)
            except socket_module.error, e:
                errstr, errint, errcode = get_errno_info(e, op='recv',
                    debugmsg=DEBUG)
                if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                    # socket unavailable to read()
                    continue
                elif errcode=='ECONNRESET' or errcode=='EPIPE':
                    # Client closed the socket...
                    sock_list.remove(sock_fd)
                else:
                    print "UNHANDLED SOCKET ERROR", errcode, errint, errstr
                    sys.exit(1)


            print "from_client_str: '{0}'".format(from_client_str)

    ## Adding dynamic_list, per input from EJP, below...
    if need_send_select is False:
        dynamic_list = sock_list
    else:
        dynamic_list = send_ready_list
    ## NOTE:  socket code shouldn't walk this list unless a write is pending...
    ##      broadast the same message to all clients...
    for sock_fd in dynamic_list:

        ## Ignore server's listening socket...
        if sock_fd is serv_sock:
            ## Only send() to accept()ed sockets...
            continue

        try:

            to_client_str = "server->client: {0}\n".format(to_client_idx)
            send_retval = sock_fd.send(to_client_str, MSG_DONTWAIT)
            ## send() returns the number of bytes written, on success
            ##     disabling assert check on sent bytes while using MSG_DONTWAIT
            #assert send_retval==len(to_client_str)

            to_client_idx += 1
            need_send_select = False
        except socket_module.error, e:
            errstr, errint, errcode = get_errno_info(e, op='send',
                debugmsg=DEBUG)
            if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                need_send_select = True
                continue
            elif errcode=='ECONNRESET' or errcode=='EPIPE':
                # Client closed the socket...
                sock_list.remove(sock_fd)
            else:
                print "FATAL UNHANDLED SOCKET ERROR", errcode, errint, errstr
                sys.exit(1)

    loop_count += 1

tcp_client.py

#!/usr/bin/python -u
from socket import AF_INET, SOCK_STREAM
from socket import MSG_DONTWAIT    # non-blocking send/recv; see man 2 recv
from socket import gethostname, socket
import socket as socket_module
import select
import fcntl
import errno
import time
import sys
import os

## NOTE: Using this script to simulate a scheduler
SERVER_HOSTNAME = 'myServerHostname'
SERVER_DOMAIN = 'mydomain.local'
PORT = 6667
DEBUG = True

def get_errno_info(e, op='', debugmsg=False):
    """Return verbose information from errno errors, such as errors returned by python socket()"""
    VALID_OP = set(['accept', 'connect', 'send', 'recv', 'read', 'write'])
    assert op.lower() in VALID_OP, "op must be: {0}".format(
        ','.join(sorted(VALID_OP)))

    ## ref: man 3 errno (in linux)... other systems may be man 2 intro
    ##   also see https://docs.python.org/2/library/errno.html
    try:
        retval_int = int(e.args[0])         # Example: 32
        retval_str = os.strerror(e.args[0]) # Example: 'Broken pipe'
        retval_code = errno.errorcode.get(retval_int, 'MODULEFAIL') # Ex: EPIPE
    except:
        ## I don't expect to get here unless something broke in python errno...
        retval_int  = -1
        retval_str  = '__somethingswrong__'
        retval_code = 'BADFAIL'

    if debugmsg:
        print "DEBUG: Can't {0}() on socket (errno:{1}, code:{2} / {3})".format(
            op, retval_int, retval_code, retval_str)
    return retval_int, retval_str, retval_code


connect_finished = False
while not connect_finished:
    try:
        c2s = socket(AF_INET, SOCK_STREAM) # Client to server socket...
        # Set socket non-blocking
        #fcntl.fcntl(c2s, fcntl.F_SETFL, os.O_NONBLOCK)
        c2s.connect(('.'.join((SERVER_HOSTNAME, SERVER_DOMAIN,)), PORT))
        c2s.setblocking(False)
        assert c2s.gettimeout()==0.0, "c2s socket should be in non-blocking mode"
        connect_finished = True
    except socket_module.error, e:
        errstr, errint, errcode = get_errno_info(e, op='connect',
            debugmsg=DEBUG)
        if errcode=='EINPROGRESS':
            pass

to_srv_idx = 0
need_send_select = False
while True:
    socket_list = [c2s]

    # Get the list sockets which can: take input, output, etc...
    if need_send_select:
        # Only do this after send() EAGAIN or EWOULDBLOCK...
        send_sock_list = socket_list
    else:
        send_sock_list = []
    recv_ready_list, send_ready_list, exception_ready = select.select(
        socket_list, send_sock_list, [])

    for sock_fd in recv_ready_list:
        assert sock_fd is c2s, "Strange socket failure here"

        #incoming message from remote server
        try:
            from_srv_str = sock_fd.recv(1024, MSG_DONTWAIT)
        except socket_module.error, e:
            ## 
            errstr, errint, errcode = get_errno_info(e, op='recv',
                debugmsg=DEBUG)
            if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                # Busy, try again later...
                print "recv() BLOCKED"
                continue
            elif errcode=='ECONNRESET' or errcode=='EPIPE':
                # Server ended normally...
                sys.exit(0)

        ## NOTE: if we get this far, we successfully received from_srv_str.
        ##    Anything caught above, is some kind of fail...
        print "from_srv_str: {0}".format(from_srv_str)

    ## Adding dynamic_list, per input from EJP, below...
    if need_send_select is False:
        dynamic_list = socket_list
    else:
        dynamic_list = send_ready_list
    for sock_fd in dynamic_list:
        # outgoing message to remote server
        if sock_fd is c2s:
            try:
                to_srv_str = 'client->server {0}'.format(to_srv_idx)
                sock_fd.send(to_srv_str, MSG_DONTWAIT)

                               ##
                time.sleep(1)  ## Client blocks the server here... Why????
                               ##

                to_srv_idx += 1
                need_send_select = False
            except socket_module.error, e:
                errstr, errint, errcode = get_errno_info(e, op='send',
                    debugmsg=DEBUG)
                if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                    ## Try to send() later...
                    print "send() BLOCKED"
                    need_send_select = True
                    continue
                elif errcode=='ECONNRESET' or errcode=='EPIPE':
                    # Server ended normally...
                    sys.exit(0)

原题码:

tcp_server.py

#!/usr/bin/python -u
from socket import AF_INET, SOCK_STREAM, SO_REUSEADDR, SOL_SOCKET
#from socket import MSG_OOB  <--- for send()
from socket import socket
import socket as socket_module
import select
import fcntl
import os

host = ''
port = 9997

serv_sock = socket(AF_INET, SOCK_STREAM)
serv_sock.setsockopt(SOL_SOCKET, SOCK_STREAM, 1)
serv_sock.bind((host, port))
serv_sock.listen(5)

fcntl.fcntl(serv_sock, fcntl.F_SETFL, os.O_NONBLOCK)  # Make the socket non-blocking

sock_list = [serv_sock]

from_client_str = '__DEFAULT__'

to_client_idx = 0
loop_count = 0
while True:
    recv_ready_list, send_ready_list, exception_ready = select.select(sock_list, sock_list,
        [], 5)

    print "---"
    print "LOOP_COUNT",  loop_count

    ## Read all sockets which are input-ready... might be client or server...
    for sock_fd in recv_ready_list:

        # accept() if we're reading on the server socket...
        if sock_fd is serv_sock:
            clientsock, clientaddr = sock_fd.accept()
            sock_list.append(clientsock)

        # read input from the client socket...
        else:
            try:
                from_client_str = sock_fd.recv(4096)
                if from_client_str=='':
                    # Client closed the socket...
                    print "CLIENT CLOSED SOCKET"
                    sock_list.remove(sock_fd)
            except socket_module.error, e:
                print "WARNING RECV FAIL"


            print "from_client_str: '{0}'".format(from_client_str)

    for sock_fd in send_ready_list:
        if sock_fd is not serv_sock:
            try:
                to_client_str = "server->client: {0}\n".format(to_client_idx)
                sock_fd.send(to_client_str)
                to_client_idx += 1
            except socket_module.error, e:
                print "TO CLIENT SEND ERROR", e

    loop_count += 1

tcp_client.py

#!/usr/bin/python -u
    
from socket import AF_INET, SOCK_STREAM
from socket import gethostname, socket
import socket as socket_module
import select
import fcntl
import errno
import time
import sys
import os

## NOTE: Using this script to simulate a scheduler
SERVER_HOSTNAME = 'myHostname'
SERVER_DOMAIN = 'mydomain.local'
PORT = 9997

def handle_socket_error_continue(e):
    ## non-blocking socket info from:
    ## 
    print "HANDLE_SOCKET_ERROR_CONTINUE"
    err = e.args[0]
    if (err==errno.EAGAIN) or (err==errno.EWOULDBLOCK):
        print 'CLIENT DEBUG: No data input from server'
        return True
    else:
        print 'FROM SERVER RECV ERROR: {0}'.format(e)
        sys.exit(1)

c2s = socket(AF_INET, SOCK_STREAM) # Client to server socket...
c2s.connect(('.'.join((SERVER_HOSTNAME, SERVER_DOMAIN,)), PORT))
# Set socket non-blocking...
fcntl.fcntl(c2s, fcntl.F_SETFL, os.O_NONBLOCK)

to_srv_idx = 0
while True:
    socket_list = [c2s]

    # Get the list sockets which can: take input, output, etc...
    recv_ready_list, send_ready_list, exception_ready = select.select(
        socket_list, socket_list, [])

    for sock_fd in recv_ready_list:
        assert sock_fd is c2s, "Strange socket failure here"

        #incoming message from remote server
        try:
            from_srv_str = sock_fd.recv(4096)
        except socket_module.error, e:
            ## 
            err_continue = handle_socket_error_continue(e)
            if err_continue is True:
                continue
        else:
            if len(from_srv_str)==0:
                print "SERVER CLOSED NORMALLY"
                sys.exit(0)

        ## NOTE: if we get this far, we successfully received from_srv_str.
        ##    Anything caught above, is some kind of fail...
        print "from_srv_str: {0}".format(from_srv_str)

    for sock_fd in send_ready_list:
        #incoming message from remote server
        if sock_fd is c2s:
            #to_srv_str = raw_input('Send to server: ')
            try:
                to_srv_str = 'client->server {0}'.format(to_srv_idx)
                sock_fd.send(to_srv_str)

                               ##
                time.sleep(1)  ## Client blocks the server here... Why????
                               ##

                to_srv_idx += 1
            except socket_module.error, e:
                print "TO SERVER SEND ERROR", e

TCP 套接字几乎总是准备好写入,除非它们的套接字发送缓冲区已满。

因此,总是select关于套接字的可写性是不正确的。您应该仅在遇到由于 EAGAIN/EWOULDBLOCK 导致的发送失败后才这样做。否则您的服务器将无意识地旋转处理可写套接字,通常是所有套接字。

However, when I put a time.sleep(1) statement after the client does an fd.send() to the server, the TCP server code intermittently pauses while the client is sleeping.

来自 运行 提供的代码的 AFAICT(很好的独立示例,顺便说一句),服务器按预期运行。

特别是,select() 调用的语义是 select() 不应该 return 直到线程有事情要做。将线程块放在 select() 中是一件 好事 ,因为线程现在无能为力,因为它可以防止线程旋转 CPU无缘无故。

所以在这种情况下,您的服务器程序告诉 select() 它希望 select() 到 return 只有当至少满足以下条件之一时:

  1. serv_sock ready-for-read(也就是说,一个新的客户端现在想连接到服务器)
  2. serv_sock 已准备好写入(我不相信这实际上发生在监听套接字上,所以这个标准可能会被忽略)
  3. clientsock 已准备好读取(即,客户端已向服务器发送了一些字节,它们正在 clientsock 的缓冲区中等待服务器线程到 recv()他们)
  4. clientsock 已准备好写入(即,clientsock 在其传出数据缓冲区中有一些空间,服务器可以 send() 将数据写入其中(如果需要)将一些数据发送回客户端)
  5. 自对 select() 的调用开始阻塞以来已经过去了五秒。

我看到(通过打印调试)当您的服务器程序阻塞时,它在 select() 内部阻塞,这表明在阻塞期间满足上述 5 个条件中的 none-期间。

这是为什么?好吧,让我们往下看。

  1. 未满足,因为没有其他客户端正在尝试连接
  2. 未满足,因为这从未发生过
  3. 未满足,因为服务器已读取连接的客户端发送的所有数据(并且由于连接的客户端本身正在休眠,它不再发送任何数据)
  4. 未满足,因为服务器已填满其 clientsock 的传出数据缓冲区(因为客户端程序正在休眠,它只是间歇性地读取来自服务器的数据,TCP 层保证 lossless/in-order 传输,所以一旦 clientsock 的传出数据缓冲区已满,clientsock 将不会 select-as-ready-for-write unless/until客户端至少从它的连接端读取了一些数据)
  5. 未满足,因为自 select() 开始阻塞以来还未过去 5 秒。

那么这种行为实际上是服务器的问题吗?事实上不是,因为服务器仍然会响应连接到服务器的任何其他客户端。特别是,只要 serv_sock 或任何其他客户端的套接字 select() 处于可读(或可写)状态,select() 仍将立即 return 等服务器可以在等待 hacked/slow 客户端唤醒时很好地处理其他客户端。

hacked/slow客户端对于用户可能是个问题,但服务器对此无能为力(除了强行断开客户端的 TCP 连接,或者可能会打印出一条日志消息,要求有人调试连接的客户端程序,我想 :)).

我同意 EJP,顺便说一句——select准备写入应该只在您实际想要写入一些数据的套接字上完成。如果您实际上并不想尽快写入套接字,那么在该套接字准备好写入时立即指示 select() 到 return 是毫无意义且适得其反的:做的问题因此,只要任何套接字的传出数据缓冲区未满(在大多数应用程序中,大部分都是时间!)。该问题的用户可见症状是您的服务器程序正在使用 100% 的 CPU 核心,即使它应该空闲或大部分空闲。