如何通过python-telegram-bot命令创建一个随时可以started/stopped的循环线程?
How to create a cyclic thread that can be started/stopped at any time via python-telegram-bot command?
我正在编写一个基于命令的 Telegram 机器人(使用 python-telegram-bot),每小时循环向用户发送消息。
我想 start/stop 使用机器人命令,添加命令处理程序,如 /start_cycle
和 /stop_cycle
。澄清一下,这就是我的想法:
def start_cycle()
# start in some way send_hourly_message()
def stop_cycle()
# stop in some way send_hourly_message()
def main():
"""Entrypoint of the bot"""
# Create updater and get dispatcher
updater = Updater(...)
dp = updater.dispatcher
# Add command handlers
dp.add_handler(CommandHandler("start_cycle", start_cycle))
dp.add_handler(CommandHandler("stop_cycle", stop_cycle))
# Start the bot until interrupt
updater.start_polling(timeout=3)
updater.idle()
让我疑惑的是,Telegram库是如何构思的,已经有基于事件的逻辑,由updater.start_polling()
和updater.idle()
开始。我没有找到任何关于如何使用可触发的基于时间的事件使其正常工作的 documentation/specific 信息。
在您看来,实现我的想法的最佳方式是什么?我稍微研究了一下 asyncio,但对于我实际需要的东西来说可能太复杂了?
提前感谢您的任何建议!
感谢@GaganTK,我能够找到我需要的东西:
def start_notify(update, context):
new_job = context.job_queue.run_repeating(my_callback, interval=3, first=0, name="my_job")
def stop_notify(update, context):
job = context.job_queue.get_jobs_by_name("my_job")
job[0].schedule_removal()
def my_callback(context: telegram.ext.CallbackContext):
print(datetime.datetime.now())
我正在编写一个基于命令的 Telegram 机器人(使用 python-telegram-bot),每小时循环向用户发送消息。
我想 start/stop 使用机器人命令,添加命令处理程序,如 /start_cycle
和 /stop_cycle
。澄清一下,这就是我的想法:
def start_cycle()
# start in some way send_hourly_message()
def stop_cycle()
# stop in some way send_hourly_message()
def main():
"""Entrypoint of the bot"""
# Create updater and get dispatcher
updater = Updater(...)
dp = updater.dispatcher
# Add command handlers
dp.add_handler(CommandHandler("start_cycle", start_cycle))
dp.add_handler(CommandHandler("stop_cycle", stop_cycle))
# Start the bot until interrupt
updater.start_polling(timeout=3)
updater.idle()
让我疑惑的是,Telegram库是如何构思的,已经有基于事件的逻辑,由updater.start_polling()
和updater.idle()
开始。我没有找到任何关于如何使用可触发的基于时间的事件使其正常工作的 documentation/specific 信息。
在您看来,实现我的想法的最佳方式是什么?我稍微研究了一下 asyncio,但对于我实际需要的东西来说可能太复杂了?
提前感谢您的任何建议!
感谢@GaganTK,我能够找到我需要的东西:
def start_notify(update, context):
new_job = context.job_queue.run_repeating(my_callback, interval=3, first=0, name="my_job")
def stop_notify(update, context):
job = context.job_queue.get_jobs_by_name("my_job")
job[0].schedule_removal()
def my_callback(context: telegram.ext.CallbackContext):
print(datetime.datetime.now())