与 uvloop 等效的异步事件循环
asyncio event loop equivalent with uvloop
我有一个使用协程方法的事件循环 asyncio
。
我热衷于使用 uvloop 寻找与以下示例等效的方法。
这是一个简单的 asyncio
事件循环示例:
import asyncio
async def read(**kwargs):
oid = kwargs.get('oid', '0.0.0.0.0.0')
time = kwargs.get('time', 1)
try:
print('start: ' + oid)
except Exception as exc:
print(exc)
finally:
await asyncio.sleep(time)
print('terminate: ' + oid)
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
if __name__ == '__main__':
snmp_configurations = [
{'time': 5, 'oid': '1.3.6.3.2.4'},
{'time': 6, 'oid': '1.3.6.3.5.8'},
] # TODO :: DUMMY
loop = event_loop(snmp_configurations)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
问题:
如何使用uvloop改造上面的代码片段?
以下更改是否正确以使用具有更高性能的 uvloop?
import uvloop
def event_loop(configs):
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # TODO :: uvloop.
loop = asyncio.get_event_loop()
` for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
[注意]:
- uvloop 声称使 asyncio 快 2-4 倍。
只需在调用 asyncio.get_event_loop()
之前设置事件循环策略。
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def read(**kwargs):
oid = kwargs.get('oid', '0.0.0.0.0.0')
time = kwargs.get('time', 1)
try:
print('start: ' + oid)
except Exception as exc:
print(exc)
finally:
await asyncio.sleep(time)
print('terminate: ' + oid)
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
if __name__ == '__main__':
snmp_configurations = [
{'time': 5, 'oid': '1.3.6.3.2.4'},
{'time': 6, 'oid': '1.3.6.3.5.8'},
] # TODO :: DUMMY
loop = event_loop(snmp_configurations)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
是的,这段代码是正确的。您可以在导入后设置事件循环策略。
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # TODO :: uvloop.
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
我有一个使用协程方法的事件循环 asyncio
。
我热衷于使用 uvloop 寻找与以下示例等效的方法。
这是一个简单的 asyncio
事件循环示例:
import asyncio
async def read(**kwargs):
oid = kwargs.get('oid', '0.0.0.0.0.0')
time = kwargs.get('time', 1)
try:
print('start: ' + oid)
except Exception as exc:
print(exc)
finally:
await asyncio.sleep(time)
print('terminate: ' + oid)
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
if __name__ == '__main__':
snmp_configurations = [
{'time': 5, 'oid': '1.3.6.3.2.4'},
{'time': 6, 'oid': '1.3.6.3.5.8'},
] # TODO :: DUMMY
loop = event_loop(snmp_configurations)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
问题:
如何使用uvloop改造上面的代码片段?
以下更改是否正确以使用具有更高性能的 uvloop?
import uvloop def event_loop(configs): asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # TODO :: uvloop. loop = asyncio.get_event_loop() ` for conf in configs: asyncio.ensure_future(read(oid=conf['oid'], time=conf['time'])) return loop
[注意]:
- uvloop 声称使 asyncio 快 2-4 倍。
只需在调用 asyncio.get_event_loop()
之前设置事件循环策略。
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def read(**kwargs):
oid = kwargs.get('oid', '0.0.0.0.0.0')
time = kwargs.get('time', 1)
try:
print('start: ' + oid)
except Exception as exc:
print(exc)
finally:
await asyncio.sleep(time)
print('terminate: ' + oid)
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
if __name__ == '__main__':
snmp_configurations = [
{'time': 5, 'oid': '1.3.6.3.2.4'},
{'time': 6, 'oid': '1.3.6.3.5.8'},
] # TODO :: DUMMY
loop = event_loop(snmp_configurations)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
是的,这段代码是正确的。您可以在导入后设置事件循环策略。
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # TODO :: uvloop.
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop