Apscheduler 作业序列化不起作用
Apscheduler Job Serialization not working
我们正在设置一个基于 Apscheduler 的调度程序,问题是我们无法序列化作业。
这是我们想要的结构。
一个class Base,用它的方法,就是这个class要运行
class Base(object):
def __init__(self, bim):
self.b = bim
pass
def print_some(self):
print "Some bout to go down"
A class BS 将安排 class 基本执行
from base import Base
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import logging
logging.basicConfig()
jobstores = {'default': SQLAlchemyJobStore(url='url_to_DB')}
class BS(object):
def __init__(self):
self.scheduler = BackgroundScheduler()
self.scheduler.configure(jobstores=jobstores)
self.scheduler.start()
self.base = Base()
def print_some(self):
print "ngnf"
def add_job(self):
self.scheduler.add_job(Base.print_some, 'interval', minutes=1)
BS().add_job()
但是当我 运行 代码时,我在将参数传递给构造函数时遇到问题
ERROR:apscheduler.executors.default:Job "Base.print_some (trigger: interval[0:00:01], next run at: 2016-05-24 18:16:27 CEST)" raised an exception
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/apscheduler/executors/base.py", line 112, in run_job
retval = job.func(*job.args, **job.kwargs)
TypeError: unbound method print_some() must be called with Base instance as first argument (got nothing instead)
我试过了
self.scheduler.add_job(Base.print_some, 'interval', args=[Base()], kwargs=dict(self=Base()), seconds=1)
还有这个
self.scheduler.add_job(Base.print_some, 'interval', args=[self.base], kwargs=dict(self.base), seconds=1)
None 他们成功了。
我错过了什么,或者应该做什么?
第二个示例无法工作,因为您的位置参数和关键字参数存在冲突。我假设第三个不起作用,因为 self.base 不是可迭代的。
您是否尝试过仅将实例作为位置参数传递?
我们正在设置一个基于 Apscheduler 的调度程序,问题是我们无法序列化作业。
这是我们想要的结构。
一个class Base,用它的方法,就是这个class要运行
class Base(object):
def __init__(self, bim):
self.b = bim
pass
def print_some(self):
print "Some bout to go down"
A class BS 将安排 class 基本执行
from base import Base
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import logging
logging.basicConfig()
jobstores = {'default': SQLAlchemyJobStore(url='url_to_DB')}
class BS(object):
def __init__(self):
self.scheduler = BackgroundScheduler()
self.scheduler.configure(jobstores=jobstores)
self.scheduler.start()
self.base = Base()
def print_some(self):
print "ngnf"
def add_job(self):
self.scheduler.add_job(Base.print_some, 'interval', minutes=1)
BS().add_job()
但是当我 运行 代码时,我在将参数传递给构造函数时遇到问题
ERROR:apscheduler.executors.default:Job "Base.print_some (trigger: interval[0:00:01], next run at: 2016-05-24 18:16:27 CEST)" raised an exception
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/apscheduler/executors/base.py", line 112, in run_job
retval = job.func(*job.args, **job.kwargs)
TypeError: unbound method print_some() must be called with Base instance as first argument (got nothing instead)
我试过了
self.scheduler.add_job(Base.print_some, 'interval', args=[Base()], kwargs=dict(self=Base()), seconds=1)
还有这个
self.scheduler.add_job(Base.print_some, 'interval', args=[self.base], kwargs=dict(self.base), seconds=1)
None 他们成功了。
我错过了什么,或者应该做什么?
第二个示例无法工作,因为您的位置参数和关键字参数存在冲突。我假设第三个不起作用,因为 self.base 不是可迭代的。 您是否尝试过仅将实例作为位置参数传递?