FLASK:如何与 mysql 服务器建立连接?

FLASK: How to establish connection with mysql server?

我已经有一个来自 Flask 的 mysql 连接,如下所示:

app.config['MYSQL_HOST'] = 'one.hostname.net'
app.config['MYSQL_USER'] = 'my_username'
app.config['MYSQL_PASSWORD'] = 'my_password'
app.config['MYSQL_DB'] = 'user_mydb'

mysql = MySQL(app)


通过此设置,我可以在 Flask 中使用 mysql 数据库连接。但是当涉及到 celery 任务时,它与 flask 在同一个 python 文件中。

@mycelery.task(bind=True, name='mytask')
def mytask(self, userid, port):
    cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
    cursor.execute('SELECT * FROM mytable WHERE id = %s', (userid,))

它抛出一个错误说

cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
AttributeError: 'NoneType' object has no attribute 'cursor'

我理解是因为芹菜没有建立 MySQL 连接。但是我怎样才能建立连接呢?这样我就不必在创建任务时连接 MySQL 服务器,就像我们已经通过 MySQL = MySQL(app) 建立连接的烧瓶一样??

这是我的 celery 设置,如果有帮助,请在此代码中添加一些内容

mycelery = Celery(app.name)
mycelery.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': 'app/broker/out',
        'data_folder_out': 'app/broker/out',
        'data_folder_processed': 'app/broker/processed'
    },

    'result_persistent': False,
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

首先,如果你想做数据库操作,你可以在芹菜任务中做,但你不必将数据库与芹菜连接。 你可以将flask和db连接起来,在你的项目中安装celery,在你的celery任务中进行db操作。

样本:

app.py

from flask import Flask
from flaskext.mysql import MySQL

app = Flask(__name__)
mysql = MySQL()
app.config['MYSQL_DATABASE_USER'] = ''
app.config['MYSQL_DATABASE_PASSWORD'] = ''
app.config['MYSQL_DATABASE_DB'] = ''
app.config['MYSQL_DATABASE_HOST'] = ''
mysql.init_app(app)

tasks.py

@celery.task
def db_connect_things(): 
    conn = mysql.connect()
    cursor =conn.cursor()
    sql_query = """select from where """
    cursor.execute(sql_query)
    ...
   

celery_config.py

from celery import Celery
celery = Celery(__name__)
celery = Celery('tasks', broker=) # rabbit,redis, ..
celery.conf.update({'CELERY_ACCEPT_CONTENT': ['pickle', 'json', 'msgpack', 'yaml']})
celery.conf.add_defaults(...)

celery.conf.update(CELERYBEAT_SCHEDULE={
    'db_connect_things': {
        'task': 'application.lib.tasks.db_connect_things',
        'schedule': crontab(minute=0, hour='*/12'),
    }})

class ContextTask(celery.Task):
...