Python - 时间表库 - 需要忙循环
Python - Schedule library - Needs to get busy loop
所以我一直在研究日程安排,终于让它开始工作了。我太兴奋了,没意识到又出现了另一个问题哈哈。但是现在的问题是,只要主要完成,它就不会结束,我真的找不到解决方案。我知道问题出在 Time.sleep(1) 行,因为每当我输入 keyboardInterrput 时都会出现错误,说 Time.sleep(1) 是 "Issue",我真的找不到解决方案结束它。
我正在使用 github 的时间表:https://github.com/dbader/schedule
while True:
UserInput = input('To run Schedule task - Press y\nTo run directly - Press n\n')
if(UserInput == 'y' or UserInput == 'Y'):
print(Fore.RESET + space)
TimeUser = input('What time to start script? Format - HH:MM\n')
schedule.every().day.at(TimeUser).do(main)
wipe()
print('Schedule starts at: ''' + TimeUser + ' - Waiting for time...')
idle = int(round(schedule.idle_seconds()))
while True:
schedule.run_pending()
time.sleep(1)
idle = int(round(schedule.idle_seconds()))
if(idle < 6.0) and (idle >= 0.0):
print('Starting in: ' + str(idle))
elif(UserInput == 'n' or UserInput == 'N'):
main()
print("Wrong input - Try again")
我确实从一个非常善良的人那里得到了关于这个的推荐:
调度库需要那个繁忙的循环。这里真正的问题是 OP 如何在不阻塞的情况下 运行 那个繁忙的循环,而该库文档中的答案是:运行 它在另一个线程中。如果他坏了,定时任务将无法完成
而且我仍然不明白在不阻塞 main 的情况下该怎么做。有人知道吗?
在该示例中,它是该模块的标准 Schedule 对象,但扩展了一个额外的方法,您只需调用一次,然后它将 运行 在一个单独的线程中。我建议尝试这样做——最快的方法可能是创建您自己的对象,该对象是 Scheduler 的子类,并在子类上实现 run_continuously()
方法。然后,您可以在调度程序上调用该方法一次,然后在主线程中执行您喜欢的任何其他操作,而无需定期调用 run_pending()
按 Ctrl+C 时出现的错误不是问题 - 它只是抱怨 sleep
在您手动终止时被打断了。如果你想根据某种条件自动退出,你可以根据循环中的某种逻辑来做
例如while not terminate:
其中 terminate 是您设置的变量,可能是全局变量,可以由计划任务更改。
此基于计划的模型的很多实用程序用于后台任务,这些任务 运行 在较长时间内重复进行。假设您想执行更多代码,或者您有一些代码需要 运行 一次并且在您进入 while
循环之前它可能是 运行,或者您想要 运行 重复它,你也可以将它添加到日程表中。
一些例子
import schedule
import threading
import time
# this is a class which uses inheritance to act as a normal Scheduler,
# but also can run_continuously() in another thread
class ContinuousScheduler(schedule.Scheduler):
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
# I've extended this a bit by adding self.jobs is None
# now it will stop running if there are no jobs stored on this schedule
while not cease_continuous_run.is_set() and self.jobs:
# for debugging
# print("ccr_flag: {0}, no. of jobs: {1}".format(cease_continuous_run.is_set(), len(self.jobs)))
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
# example using this custom scheduler that can be run in a separate thread
your_schedule = ContinuousScheduler()
your_schedule.every().day.do(print)
# it returns a threading.Event when you start it.
halt_schedule_flag = your_schedule.run_continuously()
# you can now do whatever else you like here while that runs
# if your main script doesn't stop the background thread, it will keep running
# and the main script will have to wait forever for it
# if you want to stop it running, just set the flag using set()
halt_schedule_flag.set()
# I've added another way you can stop the schedule to the class above
# if all the jobs are gone it stops, and you can remove all jobs with clear()
your_schedule.clear()
# the third way to empty the schedule is by using Single Run Jobs only
# single run jobs return schedule.CancelJob
def job_that_executes_once():
# Do some work ...
print("I'm only going to run once!")
return schedule.CancelJob
# using a different schedule for this example to avoid some threading issues
another_schedule = ContinuousScheduler()
another_schedule.every(5).seconds.do(job_that_executes_once)
halt_schedule_flag = another_schedule.run_continuously()
我会注意您是否真的需要为此使用线程 - 如果您只是希望程序在执行一次作业后退出,您需要做的就是:
while schedule.jobs:
schedule.run_pending()
time.sleep(1)
并确保你的工作 return CancelJob
。希望这个例子对修改有用,已经在 repl.it 中测试过,一切都应该适用于 Python 3.
所以我一直在研究日程安排,终于让它开始工作了。我太兴奋了,没意识到又出现了另一个问题哈哈。但是现在的问题是,只要主要完成,它就不会结束,我真的找不到解决方案。我知道问题出在 Time.sleep(1) 行,因为每当我输入 keyboardInterrput 时都会出现错误,说 Time.sleep(1) 是 "Issue",我真的找不到解决方案结束它。
我正在使用 github 的时间表:https://github.com/dbader/schedule
while True:
UserInput = input('To run Schedule task - Press y\nTo run directly - Press n\n')
if(UserInput == 'y' or UserInput == 'Y'):
print(Fore.RESET + space)
TimeUser = input('What time to start script? Format - HH:MM\n')
schedule.every().day.at(TimeUser).do(main)
wipe()
print('Schedule starts at: ''' + TimeUser + ' - Waiting for time...')
idle = int(round(schedule.idle_seconds()))
while True:
schedule.run_pending()
time.sleep(1)
idle = int(round(schedule.idle_seconds()))
if(idle < 6.0) and (idle >= 0.0):
print('Starting in: ' + str(idle))
elif(UserInput == 'n' or UserInput == 'N'):
main()
print("Wrong input - Try again")
我确实从一个非常善良的人那里得到了关于这个的推荐: 调度库需要那个繁忙的循环。这里真正的问题是 OP 如何在不阻塞的情况下 运行 那个繁忙的循环,而该库文档中的答案是:运行 它在另一个线程中。如果他坏了,定时任务将无法完成
而且我仍然不明白在不阻塞 main 的情况下该怎么做。有人知道吗?
在该示例中,它是该模块的标准 Schedule 对象,但扩展了一个额外的方法,您只需调用一次,然后它将 运行 在一个单独的线程中。我建议尝试这样做——最快的方法可能是创建您自己的对象,该对象是 Scheduler 的子类,并在子类上实现 run_continuously()
方法。然后,您可以在调度程序上调用该方法一次,然后在主线程中执行您喜欢的任何其他操作,而无需定期调用 run_pending()
按 Ctrl+C 时出现的错误不是问题 - 它只是抱怨 sleep
在您手动终止时被打断了。如果你想根据某种条件自动退出,你可以根据循环中的某种逻辑来做
例如while not terminate:
其中 terminate 是您设置的变量,可能是全局变量,可以由计划任务更改。
此基于计划的模型的很多实用程序用于后台任务,这些任务 运行 在较长时间内重复进行。假设您想执行更多代码,或者您有一些代码需要 运行 一次并且在您进入 while
循环之前它可能是 运行,或者您想要 运行 重复它,你也可以将它添加到日程表中。
一些例子
import schedule
import threading
import time
# this is a class which uses inheritance to act as a normal Scheduler,
# but also can run_continuously() in another thread
class ContinuousScheduler(schedule.Scheduler):
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
# I've extended this a bit by adding self.jobs is None
# now it will stop running if there are no jobs stored on this schedule
while not cease_continuous_run.is_set() and self.jobs:
# for debugging
# print("ccr_flag: {0}, no. of jobs: {1}".format(cease_continuous_run.is_set(), len(self.jobs)))
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
# example using this custom scheduler that can be run in a separate thread
your_schedule = ContinuousScheduler()
your_schedule.every().day.do(print)
# it returns a threading.Event when you start it.
halt_schedule_flag = your_schedule.run_continuously()
# you can now do whatever else you like here while that runs
# if your main script doesn't stop the background thread, it will keep running
# and the main script will have to wait forever for it
# if you want to stop it running, just set the flag using set()
halt_schedule_flag.set()
# I've added another way you can stop the schedule to the class above
# if all the jobs are gone it stops, and you can remove all jobs with clear()
your_schedule.clear()
# the third way to empty the schedule is by using Single Run Jobs only
# single run jobs return schedule.CancelJob
def job_that_executes_once():
# Do some work ...
print("I'm only going to run once!")
return schedule.CancelJob
# using a different schedule for this example to avoid some threading issues
another_schedule = ContinuousScheduler()
another_schedule.every(5).seconds.do(job_that_executes_once)
halt_schedule_flag = another_schedule.run_continuously()
我会注意您是否真的需要为此使用线程 - 如果您只是希望程序在执行一次作业后退出,您需要做的就是:
while schedule.jobs:
schedule.run_pending()
time.sleep(1)
并确保你的工作 return CancelJob
。希望这个例子对修改有用,已经在 repl.it 中测试过,一切都应该适用于 Python 3.