FLASK:如何与 mysql 服务器建立连接?
FLASK: How to establish connection with mysql server?
我已经有一个来自 Flask 的 mysql 连接,如下所示:
app.config['MYSQL_HOST'] = 'one.hostname.net'
app.config['MYSQL_USER'] = 'my_username'
app.config['MYSQL_PASSWORD'] = 'my_password'
app.config['MYSQL_DB'] = 'user_mydb'
mysql = MySQL(app)
通过此设置,我可以在 Flask 中使用 mysql 数据库连接。但是当涉及到 celery 任务时,它与 flask 在同一个 python 文件中。
@mycelery.task(bind=True, name='mytask')
def mytask(self, userid, port):
cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute('SELECT * FROM mytable WHERE id = %s', (userid,))
它抛出一个错误说
cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
AttributeError: 'NoneType' object has no attribute 'cursor'
我理解是因为芹菜没有建立 MySQL 连接。但是我怎样才能建立连接呢?这样我就不必在创建任务时连接 MySQL 服务器,就像我们已经通过 MySQL = MySQL(app)
建立连接的烧瓶一样??
这是我的 celery 设置,如果有帮助,请在此代码中添加一些内容
mycelery = Celery(app.name)
mycelery.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': 'app/broker/out',
'data_folder_out': 'app/broker/out',
'data_folder_processed': 'app/broker/processed'
},
'result_persistent': False,
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})
首先,如果你想做数据库操作,你可以在芹菜任务中做,但你不必将数据库与芹菜连接。
你可以将flask和db连接起来,在你的项目中安装celery,在你的celery任务中进行db操作。
样本:
app.py
from flask import Flask
from flaskext.mysql import MySQL
app = Flask(__name__)
mysql = MySQL()
app.config['MYSQL_DATABASE_USER'] = ''
app.config['MYSQL_DATABASE_PASSWORD'] = ''
app.config['MYSQL_DATABASE_DB'] = ''
app.config['MYSQL_DATABASE_HOST'] = ''
mysql.init_app(app)
tasks.py
@celery.task
def db_connect_things():
conn = mysql.connect()
cursor =conn.cursor()
sql_query = """select from where """
cursor.execute(sql_query)
...
celery_config.py
from celery import Celery
celery = Celery(__name__)
celery = Celery('tasks', broker=) # rabbit,redis, ..
celery.conf.update({'CELERY_ACCEPT_CONTENT': ['pickle', 'json', 'msgpack', 'yaml']})
celery.conf.add_defaults(...)
celery.conf.update(CELERYBEAT_SCHEDULE={
'db_connect_things': {
'task': 'application.lib.tasks.db_connect_things',
'schedule': crontab(minute=0, hour='*/12'),
}})
class ContextTask(celery.Task):
...
我已经有一个来自 Flask 的 mysql 连接,如下所示:
app.config['MYSQL_HOST'] = 'one.hostname.net'
app.config['MYSQL_USER'] = 'my_username'
app.config['MYSQL_PASSWORD'] = 'my_password'
app.config['MYSQL_DB'] = 'user_mydb'
mysql = MySQL(app)
通过此设置,我可以在 Flask 中使用 mysql 数据库连接。但是当涉及到 celery 任务时,它与 flask 在同一个 python 文件中。
@mycelery.task(bind=True, name='mytask')
def mytask(self, userid, port):
cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute('SELECT * FROM mytable WHERE id = %s', (userid,))
它抛出一个错误说
cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
AttributeError: 'NoneType' object has no attribute 'cursor'
我理解是因为芹菜没有建立 MySQL 连接。但是我怎样才能建立连接呢?这样我就不必在创建任务时连接 MySQL 服务器,就像我们已经通过 MySQL = MySQL(app)
建立连接的烧瓶一样??
这是我的 celery 设置,如果有帮助,请在此代码中添加一些内容
mycelery = Celery(app.name)
mycelery.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': 'app/broker/out',
'data_folder_out': 'app/broker/out',
'data_folder_processed': 'app/broker/processed'
},
'result_persistent': False,
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})
首先,如果你想做数据库操作,你可以在芹菜任务中做,但你不必将数据库与芹菜连接。 你可以将flask和db连接起来,在你的项目中安装celery,在你的celery任务中进行db操作。
样本:
app.py
from flask import Flask
from flaskext.mysql import MySQL
app = Flask(__name__)
mysql = MySQL()
app.config['MYSQL_DATABASE_USER'] = ''
app.config['MYSQL_DATABASE_PASSWORD'] = ''
app.config['MYSQL_DATABASE_DB'] = ''
app.config['MYSQL_DATABASE_HOST'] = ''
mysql.init_app(app)
tasks.py
@celery.task
def db_connect_things():
conn = mysql.connect()
cursor =conn.cursor()
sql_query = """select from where """
cursor.execute(sql_query)
...
celery_config.py
from celery import Celery
celery = Celery(__name__)
celery = Celery('tasks', broker=) # rabbit,redis, ..
celery.conf.update({'CELERY_ACCEPT_CONTENT': ['pickle', 'json', 'msgpack', 'yaml']})
celery.conf.add_defaults(...)
celery.conf.update(CELERYBEAT_SCHEDULE={
'db_connect_things': {
'task': 'application.lib.tasks.db_connect_things',
'schedule': crontab(minute=0, hour='*/12'),
}})
class ContextTask(celery.Task):
...