AIORedis 和 PUB/SUB 不是 asnyc
AIORedis and PUB/SUB aren't asnyc
我使用 aioredis 编写异步服务,它将在特定频道上侦听,并 运行 一些异步方式的命令。
基本上我从 examples page 中获取代码来编写一个小的测试应用程序并删除了不必要的部分:
import asyncio
import aioredis
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
还有另一个测试应用程序,它使用 sleep_for
字段发布 json blob,然后在订阅者应用程序中使用该字段来模拟 reader
协同程序中的一些工作sleep
声明。
我预计 "sleeps" 到 运行 "parallel" 但实际上它们以同步方式出现在屏幕上,只是一个接一个。
我的猜测是,只要点击 await ch.get_json(..)
(或者甚至 await ch.wait_message()
)行,我就应该能够处理下一条消息。在实践中,它 运行 就像一个同步代码。我哪里错了?这可以使用连接池来处理,但这意味着有些东西不是异步的,也不知道到底是什么。
My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message.
async/await
语法不是这样工作的。每次你在协程中点击 await
时,协程将是 "paused",将控制权交给被调用的协程。如果它正在休眠,它不会自动处理下一条消息。
你应该做的是使用 ensure_future
在单独的协程中处理每条消息:
import asyncio
import aioredis
async def handle_msg(msg):
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
asyncio.ensure_future(handle_msg(msg))
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
我使用 aioredis 编写异步服务,它将在特定频道上侦听,并 运行 一些异步方式的命令。
基本上我从 examples page 中获取代码来编写一个小的测试应用程序并删除了不必要的部分:
import asyncio
import aioredis
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
还有另一个测试应用程序,它使用 sleep_for
字段发布 json blob,然后在订阅者应用程序中使用该字段来模拟 reader
协同程序中的一些工作sleep
声明。
我预计 "sleeps" 到 运行 "parallel" 但实际上它们以同步方式出现在屏幕上,只是一个接一个。
我的猜测是,只要点击 await ch.get_json(..)
(或者甚至 await ch.wait_message()
)行,我就应该能够处理下一条消息。在实践中,它 运行 就像一个同步代码。我哪里错了?这可以使用连接池来处理,但这意味着有些东西不是异步的,也不知道到底是什么。
My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message.
async/await
语法不是这样工作的。每次你在协程中点击 await
时,协程将是 "paused",将控制权交给被调用的协程。如果它正在休眠,它不会自动处理下一条消息。
你应该做的是使用 ensure_future
在单独的协程中处理每条消息:
import asyncio
import aioredis
async def handle_msg(msg):
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
asyncio.ensure_future(handle_msg(msg))
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()