在不处理 GIL 的情况下安排 python 中的周期性非阻塞任务
Schedule periodic non-blocking tasks in python without dealing with the GIL
我一直在使用 Python 的 schedule
module to schedule overlapping tasks that don't block each other; however, the recommended way to achieve non-blocking behavior is by using python's threading
module (which of course, is vulnerable to the GIL)。
引用自日程模块的 docs:
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(1).seconds.do(run_threaded, job)
schedule.every(2).seconds.do(run_threaded, job)
schedule.every(3).seconds.do(run_threaded, job)
schedule.every(4).seconds.do(run_threaded, job)
schedule.every(5).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
我们如何安排和执行任务而不用担心 GIL?
How do we schedule and execute tasks without worrying about the GIL?
解决此问题的一种相对简单的方法是使用多处理模块...这为我们提供了类似 cron 的行为,而无需担心 GIL 问题...
通过多处理避免 GIL 问题并不是什么创新...我只是在此处记录,希望对未来的 google 员工有所帮助...
from multiprocessing import Process
from datetime import datetime
import time
from schedule import Scheduler
class MPScheduler(Scheduler):
def __init__(self, args=None, kwargs=None):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
super(MPScheduler, self).__init__(*args, **kwargs)
# Among other things, this object inherits self.jobs (a list of jobs)
self.args = args
self.kwargs = kwargs
self.processes = list()
def _mp_run_job(self, job_func):
"""Spawn another process to run the job; multiprocessing avoids GIL issues"""
job_process = Process(target=job_func, args=self.args,
kwargs=self.kwargs)
job_process.daemon = True
job_process.start()
self.processes.append(job_process)
def run_pending(self):
"""Run any jobs which are ready"""
runnable_jobs = (job_obj for job_obj in self.jobs if job_obj.should_run)
for job_obj in sorted(runnable_jobs):
job_obj.last_run = datetime.now() # Housekeeping
self._mp_run_job(job_obj.job_func)
job_obj._schedule_next_run() # Schedule the next execution datetime
self._retire_finished_processes()
def _retire_finished_processes(self):
"""Walk the list of processes and retire finished processes"""
retirement_list = list() # List of process objects to remove
for idx, process in enumerate(self.processes):
if process.is_alive():
# wait a short time for process to finish
process.join(0.01)
else:
retirement_list.append(idx)
## Retire finished processes
for process_idx in sorted(retirement_list, reverse=True):
self.processes.pop(process_idx)
def job(id, hungry=True):
print("{} running {} and hungry={}".format(datetime.now(), id, hungry))
time.sleep(10) # This job runs without blocking execution of other jobs
if __name__=='__main__':
# Build a schedule of overlapping jobs...
mp_sched = MPScheduler()
mp_sched.every(1).seconds.do(job, id=1, hungry=False)
mp_sched.every(2).seconds.do(job, id=2)
mp_sched.every(3).seconds.do(job, id=3)
mp_sched.every(4).seconds.do(job, id=4)
mp_sched.every(5).seconds.do(job, id=5)
while True:
mp_sched.run_pending()
time.sleep(1)
我一直在使用 Python 的 schedule
module to schedule overlapping tasks that don't block each other; however, the recommended way to achieve non-blocking behavior is by using python's threading
module (which of course, is vulnerable to the GIL)。
引用自日程模块的 docs:
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(1).seconds.do(run_threaded, job)
schedule.every(2).seconds.do(run_threaded, job)
schedule.every(3).seconds.do(run_threaded, job)
schedule.every(4).seconds.do(run_threaded, job)
schedule.every(5).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
我们如何安排和执行任务而不用担心 GIL?
How do we schedule and execute tasks without worrying about the GIL?
解决此问题的一种相对简单的方法是使用多处理模块...这为我们提供了类似 cron 的行为,而无需担心 GIL 问题...
通过多处理避免 GIL 问题并不是什么创新...我只是在此处记录,希望对未来的 google 员工有所帮助...
from multiprocessing import Process
from datetime import datetime
import time
from schedule import Scheduler
class MPScheduler(Scheduler):
def __init__(self, args=None, kwargs=None):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
super(MPScheduler, self).__init__(*args, **kwargs)
# Among other things, this object inherits self.jobs (a list of jobs)
self.args = args
self.kwargs = kwargs
self.processes = list()
def _mp_run_job(self, job_func):
"""Spawn another process to run the job; multiprocessing avoids GIL issues"""
job_process = Process(target=job_func, args=self.args,
kwargs=self.kwargs)
job_process.daemon = True
job_process.start()
self.processes.append(job_process)
def run_pending(self):
"""Run any jobs which are ready"""
runnable_jobs = (job_obj for job_obj in self.jobs if job_obj.should_run)
for job_obj in sorted(runnable_jobs):
job_obj.last_run = datetime.now() # Housekeeping
self._mp_run_job(job_obj.job_func)
job_obj._schedule_next_run() # Schedule the next execution datetime
self._retire_finished_processes()
def _retire_finished_processes(self):
"""Walk the list of processes and retire finished processes"""
retirement_list = list() # List of process objects to remove
for idx, process in enumerate(self.processes):
if process.is_alive():
# wait a short time for process to finish
process.join(0.01)
else:
retirement_list.append(idx)
## Retire finished processes
for process_idx in sorted(retirement_list, reverse=True):
self.processes.pop(process_idx)
def job(id, hungry=True):
print("{} running {} and hungry={}".format(datetime.now(), id, hungry))
time.sleep(10) # This job runs without blocking execution of other jobs
if __name__=='__main__':
# Build a schedule of overlapping jobs...
mp_sched = MPScheduler()
mp_sched.every(1).seconds.do(job, id=1, hungry=False)
mp_sched.every(2).seconds.do(job, id=2)
mp_sched.every(3).seconds.do(job, id=3)
mp_sched.every(4).seconds.do(job, id=4)
mp_sched.every(5).seconds.do(job, id=5)
while True:
mp_sched.run_pending()
time.sleep(1)