django - apscheduler 如何在 uwsgi 重启时保持工作

django - apscheduler how to keep job when uwsgi restart

django - 2.2.12

apscheduler - 3.6.3

我想将通知设置为在用户执行特定任务两周后发送。

scheduler.py

import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler

logger = logging.getLogger(__name__)

class JobLauncher:

    _sched = None

    def __init__(self):
        JobLauncher._sched = BackgroundScheduler()
        JobLauncher._sched.start()

    def __str__(self):
        return "JobLauncher"

    def run(self, job):
        return self.run_job(job)

    def stop(self, job):
        JobLauncher._sched.remove_job(job.name)

    def shutdown(self):
        if JobLauncher._sched.running():
            logger.debug('Scheduler is shutting down.')
            JobLauncher._sched.shutdown()
        else:
            logger.warn("Cannot shutdown scheduler because scheduler is not running at the moment. please check scheduler status.")

    def run_job(self, job):
        if JobLauncher._sched.get_job(job.name) is None:
          _job = JobLauncher._sched.add_job(func=job.runMethod, trigger='date', id=job.name, args=job.job_params,run_date = job.job_date)
          return True
        return False

class CommonJob:

    def __str__(self):
        return "Job Infos : {name : %s, job_params : %s}" % (self.name, self.job_params)

    @property
    def name(self):
        return self._name
      
    @name.setter 
    def name(self, new_name):
        self._name = new_name
      
    @property
    def job_date(self):
        return self._job_date
    
    @job_date.setter 
    def job_date(self, new_job_date):
        self._job_date = new_job_date
       
    @property
    def job_params(self):
        return self._job_params   
        
    @job_params.setter 
    def job_params(self, new_job_params):
        self._job_params = new_job_params
        
    @property
    def runMethod(self):
        return self._runMethod
    
    @runMethod.setter 
    def runMethod(self, new_runMethod):
        self._runMethod = new_runMethod
    


class JobLauncherHolder:

    _launcher = None

    @staticmethod
    def getInstance():
        if not JobLauncherHolder._launcher:
            JobLauncherHolder._launcher = JobLauncher()

        return JobLauncherHolder._launcher

添加职位代码

from utils.scheduler import JobLauncherHolder,CommonJob

def event(self,userUID):
    launcher = JobLauncherHolder.getInstance()
    if launcher:
       job = CommonJob()
       job.name = str(userUID) + 'alarm'
       date =  datetime.now()  + timedelta(days=14)
       job.job_date = date
       job.runMethod = self.testAlarm
       job.job_params = [userUID]
       launcher.run(job)

def testAlarm(self,userUID):
    sendTestFCM(userUID)

在 20 秒后更改它而不是 14 天作为测试代码和 运行 它,将正常收到通知。

但是当我在 3 分钟后更改它并且 运行 sudo service uwsgi restart 时,作业没有 运行。为了在实际服务中使用它,即使重启更新也必须维护该作业。

我试过存储redis

 def __init__(self):
        jobstores = {
        'default': RedisJobStore(jobs_key='dispatched_trips_jobs', 
        run_times_key='dispatched_trips_running', host='localhost', port=6379)
        }
        JobLauncher._sched = BackgroundScheduler(jobstores=jobstores)
        JobLauncher._sched.start()

引发错误

File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/core/handlers/exception.py", line 34, in inner
response = get_response(request)
 File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/core/handlers/base.py", line 115, in _get_response
    response = self.process_exception_by_middleware(e, request)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/core/handlers/base.py", line 113, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/views/decorators/csrf.py", line 54, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/viewsets.py", line 116, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/views.py", line 495, in dispatch
    response = self.handle_exception(exc)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/views.py", line 455, in handle_exception
    self.raise_uncaught_exception(exc)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/views.py", line 492, in dispatch
    response = handler(request, *args, **kwargs)
  File "./post/views.py", line 826, in scheduleTest
    launcher.run(job)
  File "./utils/scheduler.py", line 30, in run
    return self.run_job(job)
  File "./utils/scheduler.py", line 44, in run_job
    _job = JobLauncher._sched.add_job(func=job.runMethod, trigger='date', id=job.name, args=job.job_params,run_date = job.job_date)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 443, in add_job
    self._real_add_job(job, jobstore, replace_existing)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 867, in _real_add_job
    store.add_job(job)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/apscheduler/jobstores/redis.py", line 82, in add_job
    self.pickle_protocol))
TypeError: can't pickle uwsgi._Input objects

请告诉我在重新启动时保留作业的最佳方法 uwsgimemory redis mysql 可以使用三种方法。

发生 pickle 错误是因为您试图在其参数中添加一个包含不可序列化对象(在本例中为 uwsgi._Input)的持久作业。当计划的函数实际上是一个实例方法并且该实例包含一个不可序列化的对象时,有时会无意中发生这种情况。

要解决此问题,您需要检查传递给作业的参数,看看其中是否包含 uwsgi._Input 个对象。确保您还检查了目标函数。如果是实例方法,请确保该实例也不包含成员等对象。