Python 的 asyncio 同步工作
Python's asyncio works synchronously
我正在尝试利用 Python 的新异步库来发送异步 HTTP 请求。我想在发送每个请求之前等待几毫秒(timeout
变量)——但当然——异步发送它们,而不是在每个请求发送后等待响应。
我正在做类似以下的事情:
@asyncio.coroutine
def handle_line(self, line, destination):
print("Inside! line {} destination {}".format(line, destination))
response = yield from aiohttp.request('POST', destination, data=line,
headers=tester.headers)
print(response.status)
return (yield from response.read())
@asyncio.coroutine
def send_data(self, filename, timeout):
destination='foo'
logging.log(logging.DEBUG, 'sending_data')
with open(filename) as log_file:
for line in log_file:
try:
json_event = json.loads(line)
except ValueError as e:
print("Error parsing json event")
time.sleep(timeout)
yield from asyncio.async(self.handle_line(json.dumps(json_event), destination))
loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))
我得到的输出(通过打印 200 个响应)看起来像这段代码是 运行 同步的。我做错了什么?
这里有几个问题:
你应该使用asyncio.sleep
,而不是time.sleep
,因为后者会阻塞事件循环。
你不应该在 asyncio.async(self.handle_line(...))
调用之后使用 yield from
,因为这会使脚本阻塞,直到 self.handle_line
协程完成,这意味着您最终不会同时做任何事情;您处理每一行,等待处理完成,然后继续下一行。相反,您应该 运行 所有 asyncio.async
调用无需等待,将返回的 Task
对象保存到列表中,然后使用 asyncio.wait
等待它们全部完成都开始了。
综合起来:
@asyncio.coroutine
def handle_line(self, line, destination):
print("Inside! line {} destination {}".format(line, destination))
response = yield from aiohttp.request('POST', destination, data=line,
headers=tester.headers)
print(response.status)
return (yield from response.read())
@asyncio.coroutine
def send_data(self, filename, timeout):
destination='foo'
logging.log(logging.DEBUG, 'sending_data')
tasks = []
with open(filename) as log_file:
for line in log_file:
try:
json_event = json.loads(line)
except ValueError as e:
print("Error parsing json event")
yield from asyncio.sleep(timeout)
tasks.append(asyncio.async(
self.handle_line(json.dumps(json_event), destination))
yield from asyncio.wait(tasks)
asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))
我正在尝试利用 Python 的新异步库来发送异步 HTTP 请求。我想在发送每个请求之前等待几毫秒(timeout
变量)——但当然——异步发送它们,而不是在每个请求发送后等待响应。
我正在做类似以下的事情:
@asyncio.coroutine
def handle_line(self, line, destination):
print("Inside! line {} destination {}".format(line, destination))
response = yield from aiohttp.request('POST', destination, data=line,
headers=tester.headers)
print(response.status)
return (yield from response.read())
@asyncio.coroutine
def send_data(self, filename, timeout):
destination='foo'
logging.log(logging.DEBUG, 'sending_data')
with open(filename) as log_file:
for line in log_file:
try:
json_event = json.loads(line)
except ValueError as e:
print("Error parsing json event")
time.sleep(timeout)
yield from asyncio.async(self.handle_line(json.dumps(json_event), destination))
loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))
我得到的输出(通过打印 200 个响应)看起来像这段代码是 运行 同步的。我做错了什么?
这里有几个问题:
你应该使用
asyncio.sleep
,而不是time.sleep
,因为后者会阻塞事件循环。你不应该在
asyncio.async(self.handle_line(...))
调用之后使用yield from
,因为这会使脚本阻塞,直到self.handle_line
协程完成,这意味着您最终不会同时做任何事情;您处理每一行,等待处理完成,然后继续下一行。相反,您应该 运行 所有asyncio.async
调用无需等待,将返回的Task
对象保存到列表中,然后使用asyncio.wait
等待它们全部完成都开始了。
综合起来:
@asyncio.coroutine
def handle_line(self, line, destination):
print("Inside! line {} destination {}".format(line, destination))
response = yield from aiohttp.request('POST', destination, data=line,
headers=tester.headers)
print(response.status)
return (yield from response.read())
@asyncio.coroutine
def send_data(self, filename, timeout):
destination='foo'
logging.log(logging.DEBUG, 'sending_data')
tasks = []
with open(filename) as log_file:
for line in log_file:
try:
json_event = json.loads(line)
except ValueError as e:
print("Error parsing json event")
yield from asyncio.sleep(timeout)
tasks.append(asyncio.async(
self.handle_line(json.dumps(json_event), destination))
yield from asyncio.wait(tasks)
asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))