AsyncIO python3.4 休息服务器
AsyncIO python3.4 restserver
我正在尝试使用 python 3.4 中的 asyncio 模块实现一个简单的 restful 服务器,但无法访问我定义的端点。当我在 URL(http://127.0.0.1:8080/v1/queue 上执行 CURL(GET 请求)时,我从服务器得到一个空响应。我认为我的基本 class 有一些问题asyncio 的设置。有人可以指出吗?CURL 是否需要以使用 aiohttp 发送请求的异步方式实现?
这是我的代码:
4 import asyncio
5 import aiorest
6 import json
8
10
11 class Sample(aiorest.RESTServer):
12
13 def _init__(self, *args, **kwargs):
14 super().__init__(*args, **kwargs)
15 self.add_url('GET', '/v1/queue', self.get_handler)
16 self.add_url('POST', '/v1/stack', self.post_handler)
19 def get_handler(self, request):
20 return {'Welcome': 'user'}
21
22 def post_handler(self, request, payload):
23 if not payload:
24 return {'error': 'Payload not supplied'}
25
26 try:
27 json_payload = json.loads(payload)
28 except ValueError:
29 return {'error': 'Invalid JSON supplied'}
30 print('Payload received {0}'.format(json_payload))
32 return {'Payload Received Success'}
33
34
35 def main():
36 loop = asyncio.get_event_loop()
37 server = Sample(hostname='127.0.0.1', loop=loop)
38 srv = loop.run_until_complete(loop.create_server(
39 server.make_handler, '127.0.0.1', 8080))
40 print('Server listening on port 8080')
44 try:
45 loop.run_forever()
46 except KeyboardInterrupt:
47 pass
48 finally:
49 srv.close()
50 loop.run_until_complete(srv.wait_closed())
51 loop.close()
52
53
54 if __name__ == '__main__':
55 main()
代码不工作的原因是示例 class 构造函数 _init__
而不是 __init__
中的简单拼写错误。修复此问题后,一切都应按预期工作。
编辑
处理程序方法应该只需要 request
参数,正文可以从传递的 request
.
中获取
def post_handler(self, request):
payload = request._request_body.decode('utf-8')
if not payload:
return {'error': 'Payload not supplied'}
try:
json_payload = json.loads(payload)
except ValueError:
return {'error': 'Invalid JSON supplied'}
print('Payload received {0}'.format(json_payload))
return {'result': 'Payload Received Success'}
提示
我正在尝试使用 python 3.4 中的 asyncio 模块实现一个简单的 restful 服务器,但无法访问我定义的端点。当我在 URL(http://127.0.0.1:8080/v1/queue 上执行 CURL(GET 请求)时,我从服务器得到一个空响应。我认为我的基本 class 有一些问题asyncio 的设置。有人可以指出吗?CURL 是否需要以使用 aiohttp 发送请求的异步方式实现?
这是我的代码:
4 import asyncio
5 import aiorest
6 import json
8
10
11 class Sample(aiorest.RESTServer):
12
13 def _init__(self, *args, **kwargs):
14 super().__init__(*args, **kwargs)
15 self.add_url('GET', '/v1/queue', self.get_handler)
16 self.add_url('POST', '/v1/stack', self.post_handler)
19 def get_handler(self, request):
20 return {'Welcome': 'user'}
21
22 def post_handler(self, request, payload):
23 if not payload:
24 return {'error': 'Payload not supplied'}
25
26 try:
27 json_payload = json.loads(payload)
28 except ValueError:
29 return {'error': 'Invalid JSON supplied'}
30 print('Payload received {0}'.format(json_payload))
32 return {'Payload Received Success'}
33
34
35 def main():
36 loop = asyncio.get_event_loop()
37 server = Sample(hostname='127.0.0.1', loop=loop)
38 srv = loop.run_until_complete(loop.create_server(
39 server.make_handler, '127.0.0.1', 8080))
40 print('Server listening on port 8080')
44 try:
45 loop.run_forever()
46 except KeyboardInterrupt:
47 pass
48 finally:
49 srv.close()
50 loop.run_until_complete(srv.wait_closed())
51 loop.close()
52
53
54 if __name__ == '__main__':
55 main()
代码不工作的原因是示例 class 构造函数 _init__
而不是 __init__
中的简单拼写错误。修复此问题后,一切都应按预期工作。
编辑
处理程序方法应该只需要 request
参数,正文可以从传递的 request
.
def post_handler(self, request):
payload = request._request_body.decode('utf-8')
if not payload:
return {'error': 'Payload not supplied'}
try:
json_payload = json.loads(payload)
except ValueError:
return {'error': 'Invalid JSON supplied'}
print('Payload received {0}'.format(json_payload))
return {'result': 'Payload Received Success'}
提示