如何使用python进行调度?

How to perform scheduling using python?

我正在尝试在我的 python 中安排一些工作。假设,来自日志记录的文本应该每 1 分钟和每 5 分钟从我的 docker 容器内的 jobs.py 文件中出现。但是,文本在 docker 容器内每 2 分钟出现一次。 python 计划和 cronjobs 之间是否有冲突?

docker 容器内的当前输出

13:05:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:05:00] "GET /reminder/send_reminders HTTP/1.1" 200 -

13:06:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:06:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

13:07:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})

13:07:00 [I] jobs job_feeds_update

13:07:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:07:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

13:08:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:08:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

13:09:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})

13:09:00 [I] jobs job_feeds_update

13:09:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:09:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

13:10:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:10:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

13:10:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:10:00] "GET /reminder/send_reminders HTTP/1.1" 200 -

13:11:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})

13:11:00 [I] jobs job_feeds_update

13:11:00 [D] schedule Running job Job(interval=5, unit=minutes, do=job_send_reminders, args=(), kwargs={})

13:11:00 [I] jobs job_send_reminders

server.py

#Cron Job
@app.route('/feeds/update_feeds')
def update_feeds():
    schedule.run_pending()
    return 'OK UPDATED FEED!'
   
@app.route('/reminder/send_reminders')
def send_reminders():
    schedule.run_pending()
    return 'OK UPDATED STATUS!'
    

jobs.py

def job_feeds_update():
    update_feed()
    update_feed_eng()
    logger.info("job_feeds_update")
        
schedule.every(1).minutes.do(job_feeds_update)

# send email reminders
def job_send_reminders():
    send_reminders()
    logger.info("job_send_reminders")
schedule.every(5).minutes.do(job_send_reminders) 

Docker 文件

FROM alpine:latest

# Install curlt 
RUN apk add --no-cache curl

# Copy Scripts to Docker Image
COPY reminders.sh /usr/local/bin/reminders.sh
COPY feeds.sh /usr/local/bin/feeds.sh

RUN echo ' */5  *  *  *  * /usr/local/bin/reminders.sh' >> /etc/crontabs/root
RUN echo ' *  *  *  *  * /usr/local/bin/feeds.sh' >> /etc/crontabs/root

# Run crond  -f for Foreground 
CMD ["/usr/sbin/crond", "-f"]

我认为您运行遇到了几个问题:

  1. 如您所料,您的 schedule 与您的 cron 作业处于不同的 schedule/interval。它们不同步(由于下一个原因,您永远不能指望它们同步)。从您的 jobs.py 脚本执行的那一刻起,这就是计划计算间隔的起点。

即如果您每分钟 运行 执行某些操作,但 jobs.py 脚本在当前分钟的 30 秒后开始(即 01:00:30 - 1:00am 30 秒后),那么调度程序将 运行 1:01:30 的工作,然后 1:02:30,然后 1:03:30 等等。

  1. Scheduledoesn't guarantee you precise频率执行。当调度器运行s 一个作业时,作业执行时间不被考虑在内。因此,如果您安排诸如 feeds/reminders 工作之类的事情,则可能需要一点时间来处理。完成 运行ning 后,调度程序决定下一个作业仅在上一个作业 结束后 运行 1 分钟 。这意味着您的执行时间可能会超出计划。

试试 运行在 python 脚本中使用这个例子看看我在说什么

# Schedule Library imported
import schedule
import time
from datetime import datetime
     
def geeks():
    now = datetime.now() # current date and time
    date_time = now.strftime("%m/%d/%Y, %H:%M:%S")
    time.sleep(5)
    print(date_time + "- Look at the timestamp")
 
geeks();
# Task scheduling
# After every 10mins geeks() is called.
schedule.every(1).seconds.do(geeks)
 
# Loop so that the scheduling task
# keeps on running all time.
while True:
 
    # Checks whether a scheduled task
    # is pending to run or not
    schedule.run_pending()
    time.sleep(0.1)

我们已将 geeks 函数安排为每秒 运行。但是如果你看一下 geeks 函数,我添加了一个 time.sleep(5) 来假装这里可能有一些阻塞的 API 调用可能需要 5 秒。然后观察记录的时间戳 - 您会发现它们并不总是与我们最初想要的时间表一致!

现在看看你的 cron 作业和调度程序是如何不同步的

查看以下日志:

13:07:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:07:00 [I] jobs job_feeds_update
13:07:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:07:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

# minute 8 doesn't trigger the schedule for feeds

13:09:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:09:00 [I] jobs job_feeds_update
13:09:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:09:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

这里可能发生的情况如下:

  • 在 13:07:00,您的 cron 发送请求以供稿项目

  • 在 13:07:00,作业计划有一个待处理的供稿项作业

  • at 13:07:00:,作业完成,调度决定下一个作业只能在 运行 从现在起 1 分钟后,大约是 ~13:08:01 (注意 01,这是为了说明 milliseconds/timing 的作业执行,假设 运行 提要项目更新需要 1 秒)

  • 在 13:08:00,您的 cron 作业触发了询问 schedule run_pending 个作业的请求。

  • 在13:08:00 但是没有等待运行的工作,因为下一次 Feed 项目可以 运行 是 13:08:01,现在不是。

  • 在 13:09:00,您的 cron 选项卡再次触发请求

  • 在 13:09:00,有一个待处理的工作应该 运行 在 13:08:01 这样立即执行。

我希望这能说明您 运行 正在陷入 cron 和计划之间不同步的问题。这个问题在生产环境中会变得更糟。您可以阅读有关 Parallel execution for schedule 的更多信息,作为将事情从主线程中分离出来的一种方法,但这只会到此为止。让我们谈谈...

可能的解决方案

  1. 使用时间表 run_all 而不是 run_pending 来强制触发作业,无论它们实际安排在什么时候。

但是如果您仔细想想,这与直接从您的 API 路由本身直接调用 job_feeds_update 没有什么不同。这本身并不是一个坏主意,但它仍然不是超级干净,因为它会阻塞 API 服务器的主线程,直到 job_feeds_update 完成,如果你有其他路线,这可能不是理想的选择用户需要。

您可以将此与下一个建议结合起来:

  1. 使用作业队列和线程 查看日程表文档 Parallel Execution page 中的第二个示例。它向您展示了如何使用作业队列和线程来卸载作业。

因为你 运行 schedule.run_pending(),你服务器中的主线程被阻塞,直到作业 运行。通过使用线程(+ 作业队列),您可以继续在队列中安排作业 + 避免作业阻塞主服务器。通过继续安排作业,这应该会为您进一步优化一些事情。

  1. 使用 ischedule 而不是 ,因为它考虑了作业执行时间并提供精确的计划:https://pypi.org/project/ischedule/。这可能是对您来说最简单的解决方案,以防 1+2 最终令人头疼!

  2. 不要使用 schedule 并简单地让你的 cron 作业按 运行 实际功能的路线(所以基本上与使用上面 1+2 的建议)。这样做的问题是,如果您的函数 运行 需要超过一分钟的时间来更新提要,您可能有多个重叠的 cron 作业 运行 同时进行提要更新。因此,我建议不要这样做并依赖一种机制 queue/schedule 您对线程和作业的请求。仅将此作为您可以做的其他事情的潜在场景。