cog 中 @tasks.loop 的多个实例
Multiple instances of @tasks.loop in cog
我正在开发一个可以在 reddit 上抓取帖子的 Discord 机器人。现在我创建了一个命令,这样我就可以使用@tasks.loop(time=y) 每 y 时间获取 x 数量的帖子。在 cog 中构建命令,将其加载到 bot 中,它按预期工作。
现在我的问题是,据我所知,此命令一次只能 运行 一个实例。也就是说,如果我 运行 服务器中的命令它会正常工作,但我将无法 运行 命令进入另一个服务器,直到我 .cancel() 运行ning 实例。
我想知道是否有办法让多个实例 运行ning。比如命名循环或其他东西的方法,或者我是否应该使用 Discord.py 的另一个功能来实现我正在尝试做的事情。
编辑:
为了简单起见,我在代码中省略了一些参数,但这里是有问题的代码
def launch_client():
bot = Bot()
reddit = client.Client()
bot.add_cog(ScheduleCog(bot, reddit))
class ScheduleCog(commands.Cog):
time_unit = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 604800}
def __init__(self, bot, reddit):
self.bot = bot
self.reddit = reddit
def cog_unload(self):
self.schedule_loop.cancel()
@tasks.loop(seconds=1)
async def schedule_loop(self, ctx, sub, sort, limit, time):
posts = await self.reddit.get_posts(sub, sort, limit, time)
if posts is not None:
for post in posts:
await ctx.channel.send(post.url)
else:
await ctx.channel.send('Error in your request. Please validate fields.')
@schedule_loop.before_loop
async def before_schedule_loop(self):
await self.bot.wait_until_ready()
@commands.command()
async def subschedule(self, ctx, interval: int, interval_unit: str, sub: str, sort: str, limit: int,
time: str = 'hot'):
unit: int = self.time_unit[interval_unit]
self.schedule_loop.change_interval(seconds=interval * unit)
self.schedule_loop.start(ctx, sub, sort, limit, time)
@commands.command()
async def subschedulestop(self, ctx):
self.schedule_loop.cancel()
所以我把这个协议写给了官方discord.ext.tasks documentation.
现在循环一切正常。我启动 subschedule() 命令,它每隔 'x' 间隔 'y' 时间单位发送帖子。
现在正如原始问题中提到的那样,它在单个实例中工作,我不知道如何拥有多个实例。我希望能够在不同公会中 运行 多个 subschedule() 实例,甚至在同一个公会中使用多个 subschedule() 实例,但是当我再次调用该命令时它现在可以正常工作,我收到运行时错误 'Task is already launched and is not completed'。
我只是想知道能够 运行 并行执行多个任务的正确方法。
如错误所述,如果已经 运行,则无法再次启动相同的循环,但您可以动态创建 tasks.Loop
的新实例
class TaskHandler(commands.Cog):
def __init__(self, bot):
self.bot = bot
self._tasks = [] # Empty list, holds all the loops. You can also use a dict if you want to differentiate the tasks somehow
async def static_loop(self, *args):
# This is the function that will be 'looping'
print(args)
def task_launcher(self, *args, **interval): # The `args` are the arguments passed into the loop
"""Creates new instances of `tasks.Loop`"""
# Creating the task
new_task = tasks.loop(**interval)(self.static_loop) # You can also pass a static interval and/or count
# Starting the task
new_task.start(*args)
self._tasks.append(new_task)
@commands.command()
async def start_task(self, ctx, *args):
"""Command that launches a new task with the arguments given"""
self.task_launcher(*args, seconds=1)
await ctx.send('Task started!')
如果您还想支持 before/after_loop
new_task = tasks.loop(**interval)(self.static_loop) # You can also pass a static interval and/or count
# Adding before/after_loop functions
new_task.before_loop(BEFORE_LOOP_FUNCTION)
new_task.after_loop(AFTER_LOOP_FUNCTION)
我正在开发一个可以在 reddit 上抓取帖子的 Discord 机器人。现在我创建了一个命令,这样我就可以使用@tasks.loop(time=y) 每 y 时间获取 x 数量的帖子。在 cog 中构建命令,将其加载到 bot 中,它按预期工作。
现在我的问题是,据我所知,此命令一次只能 运行 一个实例。也就是说,如果我 运行 服务器中的命令它会正常工作,但我将无法 运行 命令进入另一个服务器,直到我 .cancel() 运行ning 实例。
我想知道是否有办法让多个实例 运行ning。比如命名循环或其他东西的方法,或者我是否应该使用 Discord.py 的另一个功能来实现我正在尝试做的事情。
编辑: 为了简单起见,我在代码中省略了一些参数,但这里是有问题的代码
def launch_client():
bot = Bot()
reddit = client.Client()
bot.add_cog(ScheduleCog(bot, reddit))
class ScheduleCog(commands.Cog):
time_unit = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 604800}
def __init__(self, bot, reddit):
self.bot = bot
self.reddit = reddit
def cog_unload(self):
self.schedule_loop.cancel()
@tasks.loop(seconds=1)
async def schedule_loop(self, ctx, sub, sort, limit, time):
posts = await self.reddit.get_posts(sub, sort, limit, time)
if posts is not None:
for post in posts:
await ctx.channel.send(post.url)
else:
await ctx.channel.send('Error in your request. Please validate fields.')
@schedule_loop.before_loop
async def before_schedule_loop(self):
await self.bot.wait_until_ready()
@commands.command()
async def subschedule(self, ctx, interval: int, interval_unit: str, sub: str, sort: str, limit: int,
time: str = 'hot'):
unit: int = self.time_unit[interval_unit]
self.schedule_loop.change_interval(seconds=interval * unit)
self.schedule_loop.start(ctx, sub, sort, limit, time)
@commands.command()
async def subschedulestop(self, ctx):
self.schedule_loop.cancel()
所以我把这个协议写给了官方discord.ext.tasks documentation.
现在循环一切正常。我启动 subschedule() 命令,它每隔 'x' 间隔 'y' 时间单位发送帖子。
现在正如原始问题中提到的那样,它在单个实例中工作,我不知道如何拥有多个实例。我希望能够在不同公会中 运行 多个 subschedule() 实例,甚至在同一个公会中使用多个 subschedule() 实例,但是当我再次调用该命令时它现在可以正常工作,我收到运行时错误 'Task is already launched and is not completed'。
我只是想知道能够 运行 并行执行多个任务的正确方法。
如错误所述,如果已经 运行,则无法再次启动相同的循环,但您可以动态创建 tasks.Loop
class TaskHandler(commands.Cog):
def __init__(self, bot):
self.bot = bot
self._tasks = [] # Empty list, holds all the loops. You can also use a dict if you want to differentiate the tasks somehow
async def static_loop(self, *args):
# This is the function that will be 'looping'
print(args)
def task_launcher(self, *args, **interval): # The `args` are the arguments passed into the loop
"""Creates new instances of `tasks.Loop`"""
# Creating the task
new_task = tasks.loop(**interval)(self.static_loop) # You can also pass a static interval and/or count
# Starting the task
new_task.start(*args)
self._tasks.append(new_task)
@commands.command()
async def start_task(self, ctx, *args):
"""Command that launches a new task with the arguments given"""
self.task_launcher(*args, seconds=1)
await ctx.send('Task started!')
如果您还想支持 before/after_loop
new_task = tasks.loop(**interval)(self.static_loop) # You can also pass a static interval and/or count
# Adding before/after_loop functions
new_task.before_loop(BEFORE_LOOP_FUNCTION)
new_task.after_loop(AFTER_LOOP_FUNCTION)