Cloud Storage Bucket Airflow DAG 的 UnicodeDecodeError MySQL
UnicodeDecodeError with MySQL to Cloud Storage Bucket Airflow DAG
我创建了一个 DAG,它从数据库中提取 MySQL 数据并将其加载到云存储,然后作为 json 文件加载到 BigQuery。
DAG 适用于某些 tables 但不是全部,因为它无法解码 tables 中的某些字符。这是相当多的数据,所以我无法准确指出错误或无效字符的位置。
我尝试将我的数据库、table 和列字符集从 utf8 更改为 utf8mb4。这没有帮助。
我也试过调用 encoding='utf-8' 和 'iso-8859-1',但是我认为我没有正确调用它们,因为我一直在用我的连接这样做,而且我仍然得到同样的错误。
我是 运行 Python 2.7.12 和 airflow v1.8.0
更新:阅读后:https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls 建议使用定义字符集的连接字符串,例如:sql_alchemy_conn = mysql://airflow@localhost:3306/airflow ?charset=utf8
如何使用云 SQL 实例完成此操作?
podio_connections = [
'mysql_connection'
]
podio_tables = [
'finance_banking_details',
'finance_goods_invoices',
]
default_args = {
'owner': 'xxxxxx',
'start_date': datetime(2018,1,11),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')
slack_notify = SlackAPIPostOperator(
task_id='slack_notify',
token='xxxxxx',
channel='data-status',
username='airflow',
text='Successfully performed Podio ETL operation',
dag=dag)
for connection in podio_connections:
for table in podio_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id=connection,
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename="%s/%s/%s{}.json"%(connection,table,table),
schema_filename="%s/schemas/%s.json"%(connection,table),
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bg_%s_%s"%(connection,table),
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='podio-reader-storage',
#destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
source_objects=["%s/%s/%s*.json"%(connection,table,table)],
schema_object="%s/schemas/%s.json"%(connection,table),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
slack_notify.set_upstream(load)
[2018-01-12 15:36:10,221] {models.py:1417} ERROR - 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute
files_to_upload = self._write_local_data_files(cursor)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files
json.dump(row_dict, tmp_file_handle)
File "/usr/lib/python2.7/json/init.py", line 189, in dump
for chunk in iterable:
File "/usr/lib/python2.7/json/encoder.py", line 434, in _iterencode
for chunk in _iterencode_dict(o, _current_indent_level):
File "/usr/lib/python2.7/json/encoder.py", line 390, in _iterencode_dict
yield _encoder(value)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte
96
是 "en-dash" 的 latin1 十六进制。要么将数据更改为 utf8,要么将连接更改为 MySQL 以说明您正在使用 charset latin1.
我创建了一个 DAG,它从数据库中提取 MySQL 数据并将其加载到云存储,然后作为 json 文件加载到 BigQuery。
DAG 适用于某些 tables 但不是全部,因为它无法解码 tables 中的某些字符。这是相当多的数据,所以我无法准确指出错误或无效字符的位置。
我尝试将我的数据库、table 和列字符集从 utf8 更改为 utf8mb4。这没有帮助。
我也试过调用 encoding='utf-8' 和 'iso-8859-1',但是我认为我没有正确调用它们,因为我一直在用我的连接这样做,而且我仍然得到同样的错误。
我是 运行 Python 2.7.12 和 airflow v1.8.0
更新:阅读后:https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls 建议使用定义字符集的连接字符串,例如:sql_alchemy_conn = mysql://airflow@localhost:3306/airflow ?charset=utf8
如何使用云 SQL 实例完成此操作?
podio_connections = [
'mysql_connection'
]
podio_tables = [
'finance_banking_details',
'finance_goods_invoices',
]
default_args = {
'owner': 'xxxxxx',
'start_date': datetime(2018,1,11),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')
slack_notify = SlackAPIPostOperator(
task_id='slack_notify',
token='xxxxxx',
channel='data-status',
username='airflow',
text='Successfully performed Podio ETL operation',
dag=dag)
for connection in podio_connections:
for table in podio_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id=connection,
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename="%s/%s/%s{}.json"%(connection,table,table),
schema_filename="%s/schemas/%s.json"%(connection,table),
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bg_%s_%s"%(connection,table),
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='podio-reader-storage',
#destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
source_objects=["%s/%s/%s*.json"%(connection,table,table)],
schema_object="%s/schemas/%s.json"%(connection,table),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
slack_notify.set_upstream(load)
[2018-01-12 15:36:10,221] {models.py:1417} ERROR - 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files json.dump(row_dict, tmp_file_handle)
File "/usr/lib/python2.7/json/init.py", line 189, in dump for chunk in iterable:
File "/usr/lib/python2.7/json/encoder.py", line 434, in _iterencode for chunk in _iterencode_dict(o, _current_indent_level):
File "/usr/lib/python2.7/json/encoder.py", line 390, in _iterencode_dict yield _encoder(value)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte
96
是 "en-dash" 的 latin1 十六进制。要么将数据更改为 utf8,要么将连接更改为 MySQL 以说明您正在使用 charset latin1.