ZMQ 中的 HTTP 服务器或如何使用 pyzmq 处理 POST 请求?
HTTP server in ZMQ or How to handle a POST request with pyzmq?
我正在尝试使用 ZMQ_STREAM
套接字创建一个 HTTP 服务器。
当我做一个简单的 POST
请求时:
POST HTTP/1.1
Host: localhost:5555
Cache-Control: no-cache
Postman-Token: 67004be5-56bc-c1a9-847a-7db3195c301d
Apples to Oranges!
这是我用 pyzmq 处理这个问题的方法:
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")
while True:
# Get HTTP request
parts = []
id_, msg = socket.recv_multipart() # [id, ''] or [id, http request]
parts.append(id_)
parts.append(msg)
if not msg:
# This is a new connection - this is just the identify frame (throw away id_)
# The body will come next
id_, msg = socket.recv_multipart() # [id, http request]
parts.append(id_)
parts.append(msg)
end = socket.recv_multipart() # [id*, ''] <- some kind of junk?
parts.append(end)
print("%s" % repr(parts))
所以 parts
列表是:
['\x00\x80\x00\x00)', '', '\x00\x80\x00\x00)', 'POST / HTTP/1.1\r\nHost: localhost:5555\r\nConnection: keep-alive\r\nContent-Length: 18\r\nCache-Control: no-cache\r\nOrigin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop\r\nContent-Type: text/plain;charset=UTF-8\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36\r\nPostman-Token: 9503fce9-8b1c-b39c-fb4d-3a7f21b509de\r\nAccept: */*\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: en-US,en;q=0.8,ru;q=0.6,uk;q=0.4\r\n\r\nApples to Oranges!', ['\x00\x80\x00\x00*', '']]
所以我明白了:
'\x00\x80\x00\x00)', ''
是连接的标识。这最初由 ZMQ_STREAM
套接字设置。在随后的请求中,它似乎不存在。
\x00\x80\x00\x00)
再次是标识,这是我们在来自 ZMQ_STREAM
套接字的客户端后续请求中看到的。
- 然后是实际的 HTTP 请求
但是最后一对幻数:['\x00\x80\x00\x00*', '']
这到底代表什么?
参考文献:
But the last pair of magic numbers: ['\x00\x80\x00\x00*', '']
What the heck does that stand for?
这是一个新连接,具有新的连接 ID。连接 ID 是一个整数计数器,您可以使用 Python 内置 ord
查看 ord(')') = 41
和 ord('*') = 42
,这是序列中的下一个数字。
用ZMQ_STREAM写一个HTTP服务器,你必须小心,因为它比建立连接后只接收一条消息要复杂得多。
问题主要在于您不能保证请求会完成; body 可能以多条消息的形式成块到达。您将不得不阅读 HTTP headers 并处理接收 body 的片段。
这是一个处理来自 curl 的 POST 请求的示例:
from traceback import print_exc
import zmq
from tornado.httputil import HTTPHeaders
class BadRequest(Exception):
pass
class ConnectionLost(Exception):
pass
def parse_request(request):
"""Parse a request verp, path, and headers"""
first_line, header_lines = request.split(b'\r\n', 1)
verb, path, proto = first_line.decode('utf8').split()
headers = HTTPHeaders.parse(header_lines.decode('utf8', 'replace'))
return verb, path, headers
def recv_body(socket, headers, chunks, request_id):
"""Receive the body of a request"""
if headers.get('expect', '').lower() == '100-continue':
if 'Content-Length' not in headers:
# Don't support chunked transfer: http://tools.ietf.org/html/rfc2616#section-3.6.1
print("Only support specified-length requests")
socket.send_multipart([
request_id, b'HTTP/1.1 400 (Bad Request)\r\n\r\n',
request_id, b'',
])
msg = 1
while msg != b'':
# flush until new connection
_, msg = socket.recv_multipart()
raise BadRequest("Only support specified-length requests")
socket.send_multipart([request_id, b'HTTP/1.1 100 (Continue)\r\n\r\n'], zmq.SNDMORE)
content_length = int(headers['Content-Length'])
print("Waiting to receive %ikB body" )
while sum(len(chunk) for chunk in chunks) < content_length:
id_, msg = socket.recv_multipart()
if msg == b'':
raise ConnectionLost("Disconnected")
if id_ != request_id:
raise ConnectionLost("Received data from wrong ID: %s != %s" % (id_, request_id))
chunks.append(msg)
return b''.join(chunks)
print(zmq.__version__, zmq.zmq_version())
socket = zmq.Context().socket(zmq.STREAM)
socket.bind("tcp://*:5555")
while True:
# Get HTTP request
request_id, msg = socket.recv_multipart()
if msg == b'':
continue
chunks = []
try:
request, first_chunk = msg.split(b'\r\n\r\n', 1)
if first_chunk:
chunks.append(first_chunk)
verb, path, headers = parse_request(request)
print(verb, path)
print("Headers:")
for key, value in headers.items():
print(' %s: %s' % (key, value))
body = recv_body(socket, headers, chunks, request_id)
print("Body: %r" % body)
except BadRequest as e:
print("Bad Request: %s" % e)
except ConnectionLost as e:
print("Connection Lost: %s" % e)
except Exception:
print("Failed to handle request", msg)
print_exc()
socket.send_multipart([
request_id, b'HTTP/1.1 500 (OK)\r\n\r\n',
request_id, b''])
else:
socket.send_multipart([
request_id, b'HTTP/1.1 200 (OK)\r\n\r\n',
request_id, b''])
这种情况的相关逻辑在 recv_body
方法中,它读取 headers 并继续接收 body 的块,直到完成。
坦率地说,我认为使用 ZMQ_STREAM 在 Python 中编写 HTTP 服务器没有多大意义。您可以将 zmq 套接字与现有的 Python 事件循环和 re-use 已经建立的 HTTP 库集成,因此您不必处理 re-inventing 这个特定的轮子。例如,pyzmq 与 tornado 事件循环配合得特别好,您可以在同一个应用程序中同时使用 zmq 套接字和 tornado http 处理程序。
我正在尝试使用 ZMQ_STREAM
套接字创建一个 HTTP 服务器。
当我做一个简单的 POST
请求时:
POST HTTP/1.1
Host: localhost:5555
Cache-Control: no-cache
Postman-Token: 67004be5-56bc-c1a9-847a-7db3195c301d
Apples to Oranges!
这是我用 pyzmq 处理这个问题的方法:
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")
while True:
# Get HTTP request
parts = []
id_, msg = socket.recv_multipart() # [id, ''] or [id, http request]
parts.append(id_)
parts.append(msg)
if not msg:
# This is a new connection - this is just the identify frame (throw away id_)
# The body will come next
id_, msg = socket.recv_multipart() # [id, http request]
parts.append(id_)
parts.append(msg)
end = socket.recv_multipart() # [id*, ''] <- some kind of junk?
parts.append(end)
print("%s" % repr(parts))
所以 parts
列表是:
['\x00\x80\x00\x00)', '', '\x00\x80\x00\x00)', 'POST / HTTP/1.1\r\nHost: localhost:5555\r\nConnection: keep-alive\r\nContent-Length: 18\r\nCache-Control: no-cache\r\nOrigin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop\r\nContent-Type: text/plain;charset=UTF-8\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36\r\nPostman-Token: 9503fce9-8b1c-b39c-fb4d-3a7f21b509de\r\nAccept: */*\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: en-US,en;q=0.8,ru;q=0.6,uk;q=0.4\r\n\r\nApples to Oranges!', ['\x00\x80\x00\x00*', '']]
所以我明白了:
'\x00\x80\x00\x00)', ''
是连接的标识。这最初由ZMQ_STREAM
套接字设置。在随后的请求中,它似乎不存在。\x00\x80\x00\x00)
再次是标识,这是我们在来自ZMQ_STREAM
套接字的客户端后续请求中看到的。- 然后是实际的 HTTP 请求
但是最后一对幻数:['\x00\x80\x00\x00*', '']
这到底代表什么?
参考文献:
But the last pair of magic numbers: ['\x00\x80\x00\x00*', ''] What the heck does that stand for?
这是一个新连接,具有新的连接 ID。连接 ID 是一个整数计数器,您可以使用 Python 内置 ord
查看 ord(')') = 41
和 ord('*') = 42
,这是序列中的下一个数字。
用ZMQ_STREAM写一个HTTP服务器,你必须小心,因为它比建立连接后只接收一条消息要复杂得多。 问题主要在于您不能保证请求会完成; body 可能以多条消息的形式成块到达。您将不得不阅读 HTTP headers 并处理接收 body 的片段。
这是一个处理来自 curl 的 POST 请求的示例:
from traceback import print_exc
import zmq
from tornado.httputil import HTTPHeaders
class BadRequest(Exception):
pass
class ConnectionLost(Exception):
pass
def parse_request(request):
"""Parse a request verp, path, and headers"""
first_line, header_lines = request.split(b'\r\n', 1)
verb, path, proto = first_line.decode('utf8').split()
headers = HTTPHeaders.parse(header_lines.decode('utf8', 'replace'))
return verb, path, headers
def recv_body(socket, headers, chunks, request_id):
"""Receive the body of a request"""
if headers.get('expect', '').lower() == '100-continue':
if 'Content-Length' not in headers:
# Don't support chunked transfer: http://tools.ietf.org/html/rfc2616#section-3.6.1
print("Only support specified-length requests")
socket.send_multipart([
request_id, b'HTTP/1.1 400 (Bad Request)\r\n\r\n',
request_id, b'',
])
msg = 1
while msg != b'':
# flush until new connection
_, msg = socket.recv_multipart()
raise BadRequest("Only support specified-length requests")
socket.send_multipart([request_id, b'HTTP/1.1 100 (Continue)\r\n\r\n'], zmq.SNDMORE)
content_length = int(headers['Content-Length'])
print("Waiting to receive %ikB body" )
while sum(len(chunk) for chunk in chunks) < content_length:
id_, msg = socket.recv_multipart()
if msg == b'':
raise ConnectionLost("Disconnected")
if id_ != request_id:
raise ConnectionLost("Received data from wrong ID: %s != %s" % (id_, request_id))
chunks.append(msg)
return b''.join(chunks)
print(zmq.__version__, zmq.zmq_version())
socket = zmq.Context().socket(zmq.STREAM)
socket.bind("tcp://*:5555")
while True:
# Get HTTP request
request_id, msg = socket.recv_multipart()
if msg == b'':
continue
chunks = []
try:
request, first_chunk = msg.split(b'\r\n\r\n', 1)
if first_chunk:
chunks.append(first_chunk)
verb, path, headers = parse_request(request)
print(verb, path)
print("Headers:")
for key, value in headers.items():
print(' %s: %s' % (key, value))
body = recv_body(socket, headers, chunks, request_id)
print("Body: %r" % body)
except BadRequest as e:
print("Bad Request: %s" % e)
except ConnectionLost as e:
print("Connection Lost: %s" % e)
except Exception:
print("Failed to handle request", msg)
print_exc()
socket.send_multipart([
request_id, b'HTTP/1.1 500 (OK)\r\n\r\n',
request_id, b''])
else:
socket.send_multipart([
request_id, b'HTTP/1.1 200 (OK)\r\n\r\n',
request_id, b''])
这种情况的相关逻辑在 recv_body
方法中,它读取 headers 并继续接收 body 的块,直到完成。
坦率地说,我认为使用 ZMQ_STREAM 在 Python 中编写 HTTP 服务器没有多大意义。您可以将 zmq 套接字与现有的 Python 事件循环和 re-use 已经建立的 HTTP 库集成,因此您不必处理 re-inventing 这个特定的轮子。例如,pyzmq 与 tornado 事件循环配合得特别好,您可以在同一个应用程序中同时使用 zmq 套接字和 tornado http 处理程序。