如何在 python 中编写多处理 Web 服务器

How to write a multiprocessing web server in python

我在 python 中有一个简单的网络服务器,它根据一些配置响应请求。配置定义 OKNOKTimeoutNull 响应的百分比:

import socket
import sys
import os
import datetime
import random
import time


# define globals
global log_file
global configs

dash = '-'
sep = '\n' + 100 * dash + '\n'
ok_message = 'HTTP/1.0 200 OK\n\n'
nok_message = 'HTTP/1.0 404 NotFound\n\n'


def initialize():
    if not os.path.isdir('./logs'):
        os.mkdir(os.path.abspath('./logs'))
    path = os.path.abspath(os.path.join(os.path.abspath('./logs'),
            datetime.datetime.now().strftime('%d-%m-%Y %H-%M-%S')))
    os.mkdir(path)
    log_file = open(os.path.join(path, 'received_packets.log'), 'a')


def finalize():
    log_file.close()


def select_resp_type():
    percents = {}
    for key, val in configs.items():
        if key.endswith('Percent'):
            percents.update({key: int(val)})
    items = [x.replace('Percent', '') for x, v in percents.items()
             if (float(counts[x.replace('Percent', '')]) / counts['all_packets']) * 100 < v]
    print items
    print [(float(counts[x.replace('Percent', '')]) / counts['all_packets']) * 100 for x, v in percents.items()]
    if len(items):
        selected = random.choice(items)
        counts[selected] += 1
        return selected
    sys.stdout('Everything is done!')
    sys.exit(0)


def get_response():
    resp_type = select_resp_type()
    if resp_type == 'ok':
        return ok_message
    elif resp_type == 'nok':
        return nok_message
    elif resp_type == 'nok':
        time.sleep(int(configs['timeoutAmount']))
        return ok_message
    elif resp_type == 'nok':
        time.sleep(int(configs['timeoutAmount']))
        return None


def load_configs(config):
    if not os.path.isfile(config):
        log_file.write('No such file ' + os.path.abspath(config))
        sys.exit(1)
    config_lines = open(config, 'r').readlines()
    configs = {}
    for line in config_lines:
        if line.strip() == '' or line.strip().startswith('#'):
            continue
        configs.update({line.split('=')[0].strip(): line.split('=')[1].strip()})


if __name__ == '__main__':
    initialize()
    config = sys.argv[3]
    load_configs(config)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((str(configs['host']), int(configs['port'])))
    s.listen(1)
    try:
        while True:
            s_sock, s_addr = s.accept()
            sfile = s_sock.makefile('rw', 0)
            content = sfile.readline().strip()
            while content != '':
                log_file.write(content + sep)
                resp = get_response()
                if resp:
                sfile.write(resp)
                sfile = s_sock.makefile('rw', 0)
                content = sfile.readline().strip()
            sfile.close()
            s_sock.close()
    except:
        print 'an exception occurred!'
        sys.exit(1)
    finally:
        finalize()

这是我的配置文件:

# server configurations
host = 127.0.0.1
port = 8000
okPercent = 80
nokPercent = 20
nullPercent = 0
timeoutPercent = 0
timeoutAmount = 120
maxClients = 10

我想将此脚本更改为多处理(我的意思是非阻塞,以便可以处理多个请求)Web 服务器,但我不知道从哪里开始以及如何做。有帮助吗?

编辑 1:

根据@Jan-Philip Gehrcke 的回答,我更改了脚本以使用 gevent 库:

def answer(s):
    try:
        gevent.sleep(1)
        s_sock, s_addr = s.accept()
        print conn_sep + 'Receive a connection from ' + str(s_addr)
        while True:
            content = s_sock.recv(1024)
            counts['all_packets'] += 1
            log_file.write(packet_sep + content)
            resp = get_response()
            if resp:
                s_sock.send(resp)
    except:
         print 'An error occurred in connection with ', s_addr, '; quiting...'



if __name__ == '__main__':
    log_dir = sys.argv[2]
    log_file = initialize(sys.argv[2])
    config = sys.argv[1]
    configs = load_configs(config)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((str(configs['host']), int(configs['port'])))
    s.listen(int(configs['maxClients']))
    threads = [gevent.spawn(answer, s) for i in xrange(int(configs['maxClients']))]
    gevent.joinall(threads)

没有任何变化。尽管如此,如果我 运行 多个客户端连接到服务器,每个客户端都应该等待前一个客户端断开连接。也许我错过了什么。有什么想法吗?

编辑 2:

正如@Paul Rooney 所说,我也尝试在主块中接受请求:

def answer(server_sock):
    try:
        gevent.sleep(1)
        while True:
            content = server_sock.recv(1024)
            counts['all_packets'] += 1
            log_file.write(packet_sep + content)
            resp = get_response()
            if resp:
                server_sock.send(resp)
    except:
         print 'An error occurred in connection with ', s_addr, '; quiting...'



if __name__ == '__main__':
    log_dir = sys.argv[2]
    log_file = initialize(sys.argv[2])
    config = sys.argv[1]
    configs = load_configs(config)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((str(configs['host']), int(configs['port'])))
    s.listen(int(configs['maxClients']))
    s_sock, s_addr = s.accept()
    print conn_sep + 'Receive a connection from ' + str(s_addr)
    threads = [gevent.spawn(answer, s_sock) for i in xrange(int(configs['maxClients']))]
    gevent.joinall(threads)

首先,我对并发连接有相同的结果;在以前的客户死亡之前,不会回答任何请求。其次,当第一个客户端断开连接时,我在服务器中收到以下错误并终止:

Traceback (most recent call last):
  File "/opt/python2.7/lib/python2.7/site-packages/gevent-1.0.1-py2.7-linux-x86_64.egg/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "main.py", line 149, in answer
    server_sock.send(resp)
error: [Errno 32] Broken pipe
<Greenlet at 0x1e202d0: answer(<socket._socketobject object at 0x1dedad0>)> failed with error

似乎当第一个客户端断开连接时,它关闭了它的套接字并且该套接字不再可用;因此其他已连接的等待客户端无法再得到答复。

"I want to change this script to be a multiprocessing (by which I mean non-blocking, so that multiple requests can be processed)"

确实,您的意思是 "non-blocking",这是正确的术语。在做任何事情之前,您需要认识到这是一个复杂的主题,您需要了解一些并发架构。

"concurrency" 是让多个事情同时发生的概念(而实际上我们经常需要有效使用单个 CPU 核心而不是 real同时)。

相信我,这不是一个微不足道的话题。许多人在这里采用的一种方法是通过 gevent(搜索那个)monkey-patch socket 模块。这将允许同时处理许多网络连接,而无需更改您的代码。实际上,你的问题是 gevent 的一个典型例子。看看吧。

这是如何运作的? Gevent 在幕后安装了一个基于 greenlet 的机器,并通过 libev 监控你打开的套接字以获取 I/O 事件。每个网络连接都在其自己的执行上下文中处理(so-called 协程,由 greenlet 实现)。在幕后,执行流程然后 在协程之间跳跃 ,具体取决于套接字上 I/O 事件的顺序。这其实是一个复杂的话题,5分钟之内你是看不懂的。

gevent/greenlet/coroutines/even-driven 架构的核心概念是:

  1. 即时检测您的程序何时会等待 I/O
  2. 改为做一些其他工作

要实现这一点不需要多个 CPU 核心,这就是为什么 "multiprocessing" 在您的标题中不是一个好术语。

在最简单的层面上,您可以做的是每次 accept 调用 returns 时生成一个新进程,并将该进程传递给由 accept 返回的客户端套接字。

您有效地将请求的处理卸载到 child 进程,让主进程自由处理新请求,同样将它们卸载到新的 child 进程。

我找到的方法,我并不是说它是完美的答案,但它对我有用(Debian Python 2.7.3)。

与您的原始代码有些相似的简单示例,仅用于演示何时生成进程。

import socket
import sys
import time
import errno
from multiprocessing import Process

ok_message = 'HTTP/1.0 200 OK\n\n'
nok_message = 'HTTP/1.0 404 NotFound\n\n'

def process_start(s_sock):

    content = s_sock.recv(32)
    s_sock.send(ok_message)
    s_sock.close()
    #time.sleep(10)
    sys.exit(0) # kill the child process

if __name__ == '__main__':
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((sys.argv[1], int(sys.argv[2])))
    print 'listen on address %s and port %d' % (sys.argv[1], int(sys.argv[2]))
    s.listen(1)
    try:
        while True:
            try:
                s_sock, s_addr = s.accept()
                p = Process(target=process_start, args=(s_sock,))
                p.start()

            except socket.error:
                # stop the client disconnect from killing us
                print 'got a socket error'

    except Exception as e:
        print 'an exception occurred!',
        print e
        sys.exit(1)
    finally:
        s.close()

需要注意的是

s_sock, s_addr = s.accept()
p = Process(target=process_start, args=(s_sock,))
p.start()

这里是您生成进程以响应 accept 返回的地方。

def process_start(s_sock):

    content = s_sock.recv(32)
    s_sock.send(ok_message)
    s_sock.close()
    #time.sleep(10)
    sys.exit(0) # kill the child process

这是启动新进程的函数,获取传递给它的套接字并发送响应(你会在这里做更多)。然后杀死 child。我不是 100% 确定这是杀死 child 进程的正确方法,或者甚至需要杀死它。如果需要,也许有人可以纠正我或编辑答案。

我可以看到,即使我取消注释 time.sleep 调用,我也可以立即从多个客户端套接字获得响应。

就系统资源和性能而言,greenlets 方式无疑是一种更好的方式。