如何为 Airflow 连接显式声明 charset=utf8
How to explicitly declare charset=utf8 for Airflow connections
这个序列:
from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)
生成以下内容:
UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)
这个序列效果很好:
from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)
关键在于显式声明 charset
。我试图在气流中使用 {"charset": "utf8"}
执行此操作:
但这并没有修复错误。进行更改后,我重新启动了我的开发环境,管理面板让我知道编辑成功。我如何使用 Airflow 连接到我的字符集作为 utf8?
我意识到这是 Airflow 中的一个错误,我已在此处报告:https://issues.apache.org/jira/browse/AIRFLOW-4824
现在我有一个使用以下代码的解决方法:
def get_uri(hook):
conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
login = ''
if conn.login:
login = '{conn.login}:{conn.password}@'.format(conn=conn)
host = conn.host
if conn.port is not None:
host += ':{port}'.format(port=conn.port)
charset = ''
if conn.extra_dejson.get('charset', False):
chrs = conn.extra_dejson["charset"]
if chrs.lower() == 'utf8' or chrs.lower() == 'utf-8':
charset = '?charset=utf8'
return '{conn.conn_type}://{login}{host}/{conn.schema}{charset}'.format(
conn=conn, login=login, host=host, charset=charset)
然后使用如下:
url = get_uri(sql_hook)
from sqlalchemy import create_engine
engine = create_engine(url)
真正的解决方案是向项目发送拉取请求以覆盖 mysql_hook.py 中的 get_uri。
from sqlalchemy import create_engine
from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
uri = conn.get_uri()
engine = create_engine(uri+'?charset=utf8')
df.to_sql('test_table', engine, if_exists='append', index=False)
我通过上面的代码解决了这个问题。
我顺便解决了这个问题并且运行良好(在文件 airflow.cfg 中编辑):
sql_alchemy_conn = mysql://user:password@host:port/airflow?charset=utf8
这个序列:
from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)
生成以下内容:
UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)
这个序列效果很好:
from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)
关键在于显式声明 charset
。我试图在气流中使用 {"charset": "utf8"}
执行此操作:
但这并没有修复错误。进行更改后,我重新启动了我的开发环境,管理面板让我知道编辑成功。我如何使用 Airflow 连接到我的字符集作为 utf8?
我意识到这是 Airflow 中的一个错误,我已在此处报告:https://issues.apache.org/jira/browse/AIRFLOW-4824
现在我有一个使用以下代码的解决方法:
def get_uri(hook):
conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
login = ''
if conn.login:
login = '{conn.login}:{conn.password}@'.format(conn=conn)
host = conn.host
if conn.port is not None:
host += ':{port}'.format(port=conn.port)
charset = ''
if conn.extra_dejson.get('charset', False):
chrs = conn.extra_dejson["charset"]
if chrs.lower() == 'utf8' or chrs.lower() == 'utf-8':
charset = '?charset=utf8'
return '{conn.conn_type}://{login}{host}/{conn.schema}{charset}'.format(
conn=conn, login=login, host=host, charset=charset)
然后使用如下:
url = get_uri(sql_hook)
from sqlalchemy import create_engine
engine = create_engine(url)
真正的解决方案是向项目发送拉取请求以覆盖 mysql_hook.py 中的 get_uri。
from sqlalchemy import create_engine
from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
uri = conn.get_uri()
engine = create_engine(uri+'?charset=utf8')
df.to_sql('test_table', engine, if_exists='append', index=False)
我通过上面的代码解决了这个问题。
我顺便解决了这个问题并且运行良好(在文件 airflow.cfg 中编辑):
sql_alchemy_conn = mysql://user:password@host:port/airflow?charset=utf8