Python 从开始时间起 n 分钟后循环到 运行
Python loop to run after n minutes from start time
我正在尝试创建一个 while 循环,它将在 2 个时间对象之间迭代,while datetime.datetime.now().time() <= datetime.datetime.now() +relativedelta(hour=1):
但是每隔 n 分钟或第二个时间间隔。因此,如果开始时间是 1:00 AM,则下一次迭代应在 1:05 AM 开始,n 为 5 分钟。所以迭代应该在开始时间的 5 分钟后开始,而不是例如,从迭代结束开始,这是使用 sleep
时的情况。您能告诉我如何实现吗?
一个可能的解决方案来自这里:
write python script that is executed every 5 minutes
import schedule
import time
def func():
print("this is python")
schedule.every(5).minutes.do(func)
while True:
schedule.run_pending()
time.sleep(1)
有了这个,开始时间必须是凌晨 1 点。其次,如果程序需要 运行 说 5 分钟 + 1 怎么办。在那种情况下,6 分钟的间隔将不起作用。
你试过了吗time.sleep(300)
其中 300 是秒。
如果您希望您的程序每 5 分钟 运行,您可以使用 time.sleep
import time
while true:
#program
time.sleep(300)
如果您想在日期之间进行迭代,请使用此模板:
from datetime import timedelta
start_date = date_utils.parse('2021-01-01')
end_date = datetime.datetime.now()
while start_date <= end_date:
one_hour = timedelta(hours=1)
one_minute = timedelta(minutes=1)
start_date = start_date + datetime.timedelta(days=1)
虽然 schedule
库有很多功能,但我认为下面的代码将帮助您获得您想要的。您可以简单地更改 start_time
、relativedelta
和 iteration_time
import time
import datetime
start_time = datetime.datetime(year=2022, month=4, day=5, hour=1, minute=00, second=00)
relativedelta = datetime.timedelta(hours=1)
iteration_time = datetime.timedelta(minutes=5)
end_time = start_time + relativedelta
last_run = None
def func():
print("this is python")
while True:
current_time = datetime.datetime.now()
if start_time <= current_time <= end_time:
if last_run:
if current_time >= last_run + iteration_time:
func()
last_run = current_time
else:
last_run = current_time
elif current_time > end_time:
break
time.sleep(1)
此代码从 4/5/2022 1:00:00AM (start_time) 1 小时 (relativedelta)
这可以通过手动实施来实现。
本质上,您需要不断循环,直到达到“活动”时间 window。到达那里后,您基本上执行您的函数并拒绝再次 运行 直到指定的执行间隔过去。主循环不需要尽可能频繁地执行,但只要它比执行间隔合理地小,偶尔 运行 就足够了。这实际上是一种限制执行率(节流)的方法。另外,函数的执行时间func
要小于间隔,否则会跳过一次或多次执行。
import datetime
import time
def repeat_between(
start_dt,
stop_dt,
interval_td,
func,
func_args=None,
func_kws=None,
collect_results=True,
throttling_s=1):
# ensure valid `func_args` and `func_kws`
func_args = () if func_args is None else tuple(func_args)
func_kws = {} if func_kws is None else dict(func_kws)
# initialize current datetime and last run
curr_dt = datetime.datetime.now()
last_run = None
# ensure the start datetime is:
# - before the stop datetime
# - after the current datetime
if stop_dt < start_dt < curr_dt:
return
else:
# collect results here
result = []
# wait until reaching the start datetime
wait_td = (start_dt - curr_dt)
time.sleep(wait_td.total_seconds())
# loop until current datetime exceeds the stop datetime
while curr_dt <= stop_dt:
# if current time is
# - past the start datetime
# - near an interval timedelta
if curr_dt >= start_dt and \
(not last_run or curr_dt >= last_run + interval_td):
curr_result = func(*func_args, **func_kws)
if collect_results:
result.append(curr.result)
last_run = curr_dt
# wait some time before checking again
if throttling_s > 0:
time.sleep(throttling_s)
# update current time
curr_dt = datetime.datetime.now()
要对此进行测试,可以使用例如:
r = repeat_between(
datetime.datetime.now() + datetime.timedelta(seconds=3),
datetime.datetime.now() + datetime.timedelta(seconds=10),
datetime.timedelta(seconds=2),
func=lambda: (datetime.datetime.now(), 'Hello!'),
throttling_s=0.1
)
print(r)
# [(datetime.datetime(2022, 4, 8, 15, 38, 21, 525347), 'Hello!'),
# (datetime.datetime(2022, 4, 8, 15, 38, 23, 530025), 'Hello!'),
# (datetime.datetime(2022, 4, 8, 15, 38, 25, 534628), 'Hello!'),
# (datetime.datetime(2022, 4, 8, 15, 38, 27, 539120), 'Hello!')]
我相信这可以被认为是一个 object-oriented “规范”解决方案,它创建一个 Thread
subclass 实例,该实例将每隔 datetime.timedelta
单位重复调用指定的函数直至取消。 运行ning 的起始时间和剩余时间 而不是 详细说明 class 关注的内容,并留给使用 [=23= 的代码] 确定。
由于大部分操作发生在单独的线程中,如果需要,主线程可以同时执行其他操作。
import datetime
from threading import Thread, Event
import time
from typing import Callable
class TimedCalls(Thread):
"""Call function again every `interval` time duration after it's first run."""
def __init__(self, func: Callable, interval: datetime.timedelta) -> None:
super().__init__()
self.func = func
self.interval = interval
self.stopped = Event()
def cancel(self):
self.stopped.set()
def run(self):
next_call = time.time()
while not self.stopped.is_set():
self.func() # Target activity.
next_call = next_call + self.interval
# Block until beginning of next interval (unless canceled).
self.stopped.wait(next_call - time.time())
def my_function():
print(f"this is python: {time.strftime('%H:%M:%S', time.localtime())}")
# Start test a few secs from now.
start_time = datetime.datetime.now() + datetime.timedelta(seconds=5)
run_time = datetime.timedelta(minutes=2) # How long to iterate function.
end_time = start_time + run_time
assert start_time > datetime.datetime.now(), 'Start time must be in future'
timed_calls = TimedCalls(my_function, 10) # Thread to call function every 10 secs.
print(f'waiting until {start_time.strftime("%H:%M:%S")} to begin...')
wait_time = start_time - datetime.datetime.now()
time.sleep(wait_time.total_seconds())
print('starting')
timed_calls.start() # Start thread.
while datetime.datetime.now() < end_time:
time.sleep(1) # Twiddle thumbs while waiting.
print('done')
timed_calls.cancel()
样本运行:
waiting until 11:58:30 to begin...
starting
this is python: 11:58:30
this is python: 11:58:40
this is python: 11:58:50
this is python: 11:59:00
this is python: 11:59:10
this is python: 11:59:20
this is python: 11:59:30
this is python: 11:59:40
this is python: 11:59:50
this is python: 12:00:00
this is python: 12:00:10
this is python: 12:00:20
done
我正在尝试创建一个 while 循环,它将在 2 个时间对象之间迭代,while datetime.datetime.now().time() <= datetime.datetime.now() +relativedelta(hour=1):
但是每隔 n 分钟或第二个时间间隔。因此,如果开始时间是 1:00 AM,则下一次迭代应在 1:05 AM 开始,n 为 5 分钟。所以迭代应该在开始时间的 5 分钟后开始,而不是例如,从迭代结束开始,这是使用 sleep
时的情况。您能告诉我如何实现吗?
一个可能的解决方案来自这里: write python script that is executed every 5 minutes
import schedule
import time
def func():
print("this is python")
schedule.every(5).minutes.do(func)
while True:
schedule.run_pending()
time.sleep(1)
有了这个,开始时间必须是凌晨 1 点。其次,如果程序需要 运行 说 5 分钟 + 1 怎么办。在那种情况下,6 分钟的间隔将不起作用。
你试过了吗time.sleep(300) 其中 300 是秒。
如果您希望您的程序每 5 分钟 运行,您可以使用 time.sleep
import time
while true:
#program
time.sleep(300)
如果您想在日期之间进行迭代,请使用此模板:
from datetime import timedelta
start_date = date_utils.parse('2021-01-01')
end_date = datetime.datetime.now()
while start_date <= end_date:
one_hour = timedelta(hours=1)
one_minute = timedelta(minutes=1)
start_date = start_date + datetime.timedelta(days=1)
虽然 schedule
库有很多功能,但我认为下面的代码将帮助您获得您想要的。您可以简单地更改 start_time
、relativedelta
和 iteration_time
import time
import datetime
start_time = datetime.datetime(year=2022, month=4, day=5, hour=1, minute=00, second=00)
relativedelta = datetime.timedelta(hours=1)
iteration_time = datetime.timedelta(minutes=5)
end_time = start_time + relativedelta
last_run = None
def func():
print("this is python")
while True:
current_time = datetime.datetime.now()
if start_time <= current_time <= end_time:
if last_run:
if current_time >= last_run + iteration_time:
func()
last_run = current_time
else:
last_run = current_time
elif current_time > end_time:
break
time.sleep(1)
此代码从 4/5/2022 1:00:00AM (start_time) 1 小时 (relativedelta)
这可以通过手动实施来实现。
本质上,您需要不断循环,直到达到“活动”时间 window。到达那里后,您基本上执行您的函数并拒绝再次 运行 直到指定的执行间隔过去。主循环不需要尽可能频繁地执行,但只要它比执行间隔合理地小,偶尔 运行 就足够了。这实际上是一种限制执行率(节流)的方法。另外,函数的执行时间func
要小于间隔,否则会跳过一次或多次执行。
import datetime
import time
def repeat_between(
start_dt,
stop_dt,
interval_td,
func,
func_args=None,
func_kws=None,
collect_results=True,
throttling_s=1):
# ensure valid `func_args` and `func_kws`
func_args = () if func_args is None else tuple(func_args)
func_kws = {} if func_kws is None else dict(func_kws)
# initialize current datetime and last run
curr_dt = datetime.datetime.now()
last_run = None
# ensure the start datetime is:
# - before the stop datetime
# - after the current datetime
if stop_dt < start_dt < curr_dt:
return
else:
# collect results here
result = []
# wait until reaching the start datetime
wait_td = (start_dt - curr_dt)
time.sleep(wait_td.total_seconds())
# loop until current datetime exceeds the stop datetime
while curr_dt <= stop_dt:
# if current time is
# - past the start datetime
# - near an interval timedelta
if curr_dt >= start_dt and \
(not last_run or curr_dt >= last_run + interval_td):
curr_result = func(*func_args, **func_kws)
if collect_results:
result.append(curr.result)
last_run = curr_dt
# wait some time before checking again
if throttling_s > 0:
time.sleep(throttling_s)
# update current time
curr_dt = datetime.datetime.now()
要对此进行测试,可以使用例如:
r = repeat_between(
datetime.datetime.now() + datetime.timedelta(seconds=3),
datetime.datetime.now() + datetime.timedelta(seconds=10),
datetime.timedelta(seconds=2),
func=lambda: (datetime.datetime.now(), 'Hello!'),
throttling_s=0.1
)
print(r)
# [(datetime.datetime(2022, 4, 8, 15, 38, 21, 525347), 'Hello!'),
# (datetime.datetime(2022, 4, 8, 15, 38, 23, 530025), 'Hello!'),
# (datetime.datetime(2022, 4, 8, 15, 38, 25, 534628), 'Hello!'),
# (datetime.datetime(2022, 4, 8, 15, 38, 27, 539120), 'Hello!')]
我相信这可以被认为是一个 object-oriented “规范”解决方案,它创建一个 Thread
subclass 实例,该实例将每隔 datetime.timedelta
单位重复调用指定的函数直至取消。 运行ning 的起始时间和剩余时间 而不是 详细说明 class 关注的内容,并留给使用 [=23= 的代码] 确定。
由于大部分操作发生在单独的线程中,如果需要,主线程可以同时执行其他操作。
import datetime
from threading import Thread, Event
import time
from typing import Callable
class TimedCalls(Thread):
"""Call function again every `interval` time duration after it's first run."""
def __init__(self, func: Callable, interval: datetime.timedelta) -> None:
super().__init__()
self.func = func
self.interval = interval
self.stopped = Event()
def cancel(self):
self.stopped.set()
def run(self):
next_call = time.time()
while not self.stopped.is_set():
self.func() # Target activity.
next_call = next_call + self.interval
# Block until beginning of next interval (unless canceled).
self.stopped.wait(next_call - time.time())
def my_function():
print(f"this is python: {time.strftime('%H:%M:%S', time.localtime())}")
# Start test a few secs from now.
start_time = datetime.datetime.now() + datetime.timedelta(seconds=5)
run_time = datetime.timedelta(minutes=2) # How long to iterate function.
end_time = start_time + run_time
assert start_time > datetime.datetime.now(), 'Start time must be in future'
timed_calls = TimedCalls(my_function, 10) # Thread to call function every 10 secs.
print(f'waiting until {start_time.strftime("%H:%M:%S")} to begin...')
wait_time = start_time - datetime.datetime.now()
time.sleep(wait_time.total_seconds())
print('starting')
timed_calls.start() # Start thread.
while datetime.datetime.now() < end_time:
time.sleep(1) # Twiddle thumbs while waiting.
print('done')
timed_calls.cancel()
样本运行:
waiting until 11:58:30 to begin...
starting
this is python: 11:58:30
this is python: 11:58:40
this is python: 11:58:50
this is python: 11:59:00
this is python: 11:59:10
this is python: 11:59:20
this is python: 11:59:30
this is python: 11:59:40
this is python: 11:59:50
this is python: 12:00:00
this is python: 12:00:10
this is python: 12:00:20
done