使用计时器装饰器触发电视节目方法

Use timer decorators to trigger telethon methods

我想为我的项目制作一个装饰器,每隔 x 秒就会启动某个电视节目任务。

我在电视节目组里问过,有人给了我一个小装饰器,但这里的问题是我需要使用 run_until_complete 启动循环,而我在启动客户端时已经使用了它。这是我的代码:

def periodic(period):
    def scheduler(fcn):
        async def wrapper():

            while True:
                asyncio.ensure_future(fcn())
                await asyncio.sleep(period)

        return wrapper

    return scheduler

@periodic(2)
async def do_something():
    await asyncio.sleep(5)  # Do some heavy calculation
    me = await client.get_me()
    print(me.stingfy())

现在我已经在 main 中有一个循环 运行ning:

if __name__ == "__main__":

async def start():
    await client.start()
    await client.get_me()
    await client.run_until_disconnected()

loop = asyncio.get_event_loop()
loop.run_until_complete(start())

而且我不能 运行 另一个循环,因为如果我这样做,它似乎会关闭这个循环。关于如何使这项工作有任何想法吗?

您似乎想要同时 运行 多项功能。因此,您可以使用 asyncio.Taskasyncio.create_task 创建任务并将它们添加到列表中,然后使用 asyncio.waitetc.

运行 它们
import asyncio

def periodic(period):
    def scheduler(fcn):
        async def wrapper():
            while 1:
                asyncio.ensure_future(fcn())
                await asyncio.sleep(period)
        return wrapper
    return scheduler

@periodic(2)
async def do_something():
    print("Im running")

async def client():
    while 1:
        await asyncio.sleep(1)
        print("This is client")

if __name__ == "__main__":
    async def start():
        task = [asyncio.Task(client()),
                asyncio.Task(do_something())]
        done, pending = await asyncio.wait(task)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(start())