如何在不阻塞程序的情况下为命令制作计时器
How can I make a timer for a command without blocking the program
我正在尝试添加一个命令,该命令在输入时会在几秒钟后发回一条消息。我找到了一个解决方案,但它会阻止程序,因此在计时器结束之前其他用户无法使用该机器人。
我也尝试过使用后台任务,但它一直告诉我它没有 'start' 方法。
编辑:该代码只是一个示例,我希望能够在等待时执行其他代码(pass 的去向)
这是我的齿轮代码:
import discord
from discord.ext import tasks, commands
import time
class archive(commands.Cog):
def __init__(self,bot):
self.bot=bot
@commands.Cog.listener()
async def on_ready(self):
print("Loaded 'Archive' module")
@commands.command()
async def test(self,ctx):
target_time=time.time()+5
while time.time()<target_time:
pass
await ctx.send("5 seconds have passed")
def setup(bot):
bot.add_cog(archive(bot))
如果你只想等待,你需要使用asyncio.sleep,这样其他异步代码就可以在等待时执行:
import asyncio
@commands.command()
async def test(self, ctx):
await asyncio.sleep(5)
await ctx.send("5 seconds have passed")
(您还需要为 ctx.send
使用 await,因为它是协程)
编辑:如果你想让你的测试命令 运行 一些代码但在 X 秒后超时,那么你需要使用 asyncio.wait_for:
from random import random
import asyncio
async def my_task():
for i in range(10):
print(f"randomly long task: {i}/10")
await asyncio.sleep(random())
@commands.command()
async def test(self, ctx):
try:
await asyncio.wait_for(my_task(), timeout=5)
await ctx.send("Task successful")
except asyncio.TimeoutError:
await ctx.send("Task timed out (5 seconds)")
我正在尝试添加一个命令,该命令在输入时会在几秒钟后发回一条消息。我找到了一个解决方案,但它会阻止程序,因此在计时器结束之前其他用户无法使用该机器人。
我也尝试过使用后台任务,但它一直告诉我它没有 'start' 方法。
编辑:该代码只是一个示例,我希望能够在等待时执行其他代码(pass 的去向)
这是我的齿轮代码:
import discord
from discord.ext import tasks, commands
import time
class archive(commands.Cog):
def __init__(self,bot):
self.bot=bot
@commands.Cog.listener()
async def on_ready(self):
print("Loaded 'Archive' module")
@commands.command()
async def test(self,ctx):
target_time=time.time()+5
while time.time()<target_time:
pass
await ctx.send("5 seconds have passed")
def setup(bot):
bot.add_cog(archive(bot))
如果你只想等待,你需要使用asyncio.sleep,这样其他异步代码就可以在等待时执行:
import asyncio
@commands.command()
async def test(self, ctx):
await asyncio.sleep(5)
await ctx.send("5 seconds have passed")
(您还需要为 ctx.send
使用 await,因为它是协程)
编辑:如果你想让你的测试命令 运行 一些代码但在 X 秒后超时,那么你需要使用 asyncio.wait_for:
from random import random
import asyncio
async def my_task():
for i in range(10):
print(f"randomly long task: {i}/10")
await asyncio.sleep(random())
@commands.command()
async def test(self, ctx):
try:
await asyncio.wait_for(my_task(), timeout=5)
await ctx.send("Task successful")
except asyncio.TimeoutError:
await ctx.send("Task timed out (5 seconds)")