APScheduler 运行 Tornado 中的异步函数 Python

APScheduler run async function in Tornado Python

我正在尝试开发一个小应用程序,它将从 API 收集天气数据。我已经使用 APScheduler 每 x 分钟执行一次函数。我使用 Python Tornado 框架。

我得到的错误是:

INFO     Job "GetWeather (trigger: interval[0:01:00], next run at: 2015-03-28 11:40:58 CET)" executed successfully
ERROR    Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0335C978>, <tornado.concurrent.Future object at 0x03374430>)
Traceback (most recent call last):
  File "C:\Python34\Lib\site-packages\tornado\ioloop.py", line 568, in _run_callback
    ret = callback()
  File "C:\Python34\Lib\site-packages\tornado\stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
greenlet.error: cannot switch to a different thread

我认为这是来自 GetWeather() 的协程,因为如果我从中删除所有异步功能,它就可以工作。

我正在使用 Motor 读取所需的坐标并将它们传递给 API 并将天气数据存储在 MongoDB 中。

import os.path, logging
import tornado.web
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
from tornado import gen
from tornado.options import define, options
from apscheduler.schedulers.tornado import TornadoScheduler
import motor

client = motor.MotorClient()
db = client['apitest']

console_log = logging.getLogger(__name__)

define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")

class MainRequest (tornado.web.RequestHandler):
    def get(self):
        self.write("Hello")

scheduler = TornadoScheduler()

class ScheduledTasks(object):
    def get(self):
        print("This is the scheduler");

def AddJobs():
    scheduler.add_job(GetWeather, 'interval', minutes=1)

def StartScheduler():
    scheduler.start();

def StopScheduler():
    scheduler.stop();

class Weather(tornado.web.RequestHandler):
    def get(self):
        self.write("This is the Weather Robot!")
        GetWeather()

@gen.coroutine
def GetWeather():
    '''
    Getting city weather from forecast.io API
    '''
    console_log.debug('Start: weather robot')    
    cursor = FindCities()

    while (yield cursor.fetch_next):
        city = cursor.next_object()
        lat = str(city["lat"])
        lon = str(city["lon"])     
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("https://api.forecast.io/forecast/3925d0668cf520768ca855951f1097cd/%s,%s" %(lat, lon))

        if response.error:
            print ("Error:", response.error)
            # Store all cities with errors in order to save them in the log file
        else:         
            json = tornado.escape.json_decode(response.body)
            temperature =  json["currently"]["temperature"]
            summary = json["currently"]["summary"]
            db.cities.update({'_id': city["_id"]}, {'$set': {'temperature': temperature, 'summary': summary}})

    console_log.debug('End: weather robot')
    return

def FindCities():
    '''
    cities = [{
                "_id" : ObjectId("55165d07258058ee8dca2172"),
                "name" : "London",
                "country" : "United Kingdom",
                "lat" : 51.507351,
                "lon" : -0.127758
            },
            {
                "_id" : ObjectId("55165d07258058ee8dca2173"),
                "name" : "Barcelona",
                "country" : "Spain",
                "lat" : 41.385064,
                "lon" : 2.173403
            }      
    '''
    cities = db.cities.find().sort([('_id', -1)])
    return cities

def main():
    logging.basicConfig(level=logging.DEBUG,format='%(levelname)-8s %(message)s')
    app = tornado.web.Application(
            [
                (r'/robots/weather', Weather),
                (r'/', MainRequest)
            ],
            cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
            login_url="/auth/login",
            template_path=os.path.join(os.path.dirname(__file__), "templates"),
            static_path=os.path.join(os.path.dirname(__file__), "static"),
            xsrf_cookies=True,
            debug=options.debug,
        )
    app.listen(options.port)
    AddJobs()
    StartScheduler()
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()

知道我做错了什么吗?正如我在 APScheduler 代码中看到的那样,TornadoScheduler() 在 Tornado IOLoop 中运行... (https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master)

哦!忘了说了,思路是可以通过APScheduler执行任务,也可以手动执行。

非常感谢!

在我看来像 TornadoScheduler,即使它与 IOLoop 集成,它 still runs operations on a thread pool:

def _create_default_executor(self):
    """Creates a default executor store, specific to the particular scheduler type."""
    return ThreadPoolExecutor()

Motor 不喜欢 运行 在同一进程中的多个线程上——我只测试在主线程上使用 Motor 的用例。

我认为您应该使用 Tornado PeriodicCallback 而不是 APScheduler,或者您应该将 PyMongo 与 APScheduler 一起使用(因此 PyMongo 在后台线程上 运行)而不是 Motor。

默认情况下,TornadoScheduler 运行s 在线程池中安排任务。但是,您的特定任务使用 IOLoop,因此预计在同一线程中为 运行。要解决此问题,您可以使用龙卷风 IOLoop 的 add_callback() 方法尽快将任务安排在 IOLoop 的线程中 运行。

像这样:

def your_scheduled_task():
    IOLoop.instance().add_callback(your_real_task_function)

甚至更好:

scheduler.add_job(IOLoop.instance().add_callback, 'interval', minutes=1, args=[GetWeather])