在 Discord.py 上使用 MongoDB 进行计划任务
Using MongoDB on Discord.py for scheduled tasks
目的
我有一些命令,比如 temp-mute、temp-ban 和其他需要在命令执行后完成的命令,我确实需要安排一些事情,比如赠品,并在订阅结束后立即触发一个功能,来自命令。
我想要什么?
我想把我所有的时间和东西都存储在MongoDB中,然后让它在需要触发功能的时候触发。我目前使用 await asyncio.sleep(seconds)
,但是当我重新启动机器人时它会停止,或者如果机器人离线,我希望该功能在时间过去后一上线就触发,或者我希望它按时触发即使在机器人重启后。
您可以使用 @tasks.loop()
.
from pymongo import MongoClient
cluster = MongoClient("mongo_url_here")
collection = cluster["name"]
这可能是您描述该系列的方式。现在,当您临时禁止或临时静音某人时,您需要保存最后的时间。你可以这样做,
current_time = datetime.datetime.now()
final_time = current_time + datetime.timedelta(seconds=seconds_here)
然后将 final_time
保存到数据库中。
现在您需要创建一个 @tasks.loop(seconds=x)
x 表示“每隔 x 秒后 @tasks.loop()
中的函数将 运行
@tasks.loop(seconds=10)
async def checker(self):
try:
all = collection.find({}) # return all documents inside the db
current_time = datetime.datetime.now()
async for x in all:
if current >= x["Time"]: # do stuff after this
else:
pass
except Exception:
pass
x["Time"]
-> 这可以是存储 final_time
的文档中的变量
{"id" : id , "Time" : final_time} -> 像这样
现在,
async def checker(self):
-> Checker 是一个函数,你需要启动它。看你有没有用cog
If not -> checker.start()
# 代码中的任何地方开始
如果是cog -> self.checker.start()
# 应该放在`init
里面
目的
我有一些命令,比如 temp-mute、temp-ban 和其他需要在命令执行后完成的命令,我确实需要安排一些事情,比如赠品,并在订阅结束后立即触发一个功能,来自命令。
我想要什么?
我想把我所有的时间和东西都存储在MongoDB中,然后让它在需要触发功能的时候触发。我目前使用 await asyncio.sleep(seconds)
,但是当我重新启动机器人时它会停止,或者如果机器人离线,我希望该功能在时间过去后一上线就触发,或者我希望它按时触发即使在机器人重启后。
您可以使用 @tasks.loop()
.
from pymongo import MongoClient
cluster = MongoClient("mongo_url_here")
collection = cluster["name"]
这可能是您描述该系列的方式。现在,当您临时禁止或临时静音某人时,您需要保存最后的时间。你可以这样做,
current_time = datetime.datetime.now()
final_time = current_time + datetime.timedelta(seconds=seconds_here)
然后将 final_time
保存到数据库中。
现在您需要创建一个 @tasks.loop(seconds=x)
x 表示“每隔 x 秒后 @tasks.loop()
中的函数将 运行
@tasks.loop(seconds=10)
async def checker(self):
try:
all = collection.find({}) # return all documents inside the db
current_time = datetime.datetime.now()
async for x in all:
if current >= x["Time"]: # do stuff after this
else:
pass
except Exception:
pass
x["Time"]
-> 这可以是存储 final_time
{"id" : id , "Time" : final_time} -> 像这样
现在,
async def checker(self):
-> Checker 是一个函数,你需要启动它。看你有没有用cog
If not -> checker.start()
# 代码中的任何地方开始
如果是cog -> self.checker.start()
# 应该放在`init