Airflow PostgresHook 错误编解码器无法解码位置 3 中的字节 0xc3
Airflow PostgresHook error codec can't decode byte 0xc3 in position 3
我想将 postgres table 读取到数据工程管道的数据框中。我正在使用 Airflow 来安排这些任务。我在 Airflow 中创建了一个名为 postgres_product_db 的连接,并尝试使用 get_pandas_df
来获取记录。
db_hook = PostgresHook('postgres_product_db')
fetch_item = db_hook.get_pandas_df(request)
但它抛出错误
UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)
完整的错误日志:
[2022-04-05, 06:46:27 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 188, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/azureuser/airflow/dags/foodstar_store1_pricing_update_program.py", line 81, in fetch_inventory
inventory = db_hook.get_pandas_df(request)
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 138, in get_pandas_df
return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 566, in read_sql
return pandas_sql.read_query(
File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2094, in read_query
data = self._fetchall_as_list(cursor)
File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2108, in _fetchall_as_list
result = cur.fetchall()
UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)
在正常情况下,为了克服这个问题,我曾经设置编码
conn=psycopg2.connect(
database="xxxxx", user='xxxxx', password='xxxx', host='xxxx.xxx.xxx.xx.xxx', port='5432')
conn.set_client_encoding('UNICODE')
cur=conn.cursor()
但无法在 PostgresHook 中找到任何设置 client_encoding 的选项。连接中有一个 Extra 选项,我尝试将其设置为 { encode: 'UNICODE' }
。但这也会引发错误。有人可以帮忙吗?
client_encoding
是 run time config。
这意味着您应该将其嵌入到您的 SQL 语句中:
db_hook = PostgresHook('postgres_product_db')
sql="SET client_encoding = 'UTF8'; SELECT col FROM my_table "
fetch_item = db_hook.get_pandas_df(sql=sql)
你没有问过,但是对于 PostgresOperator
适用的情况,用法可以是:
from airflow.providers.postgres.operators.postgres import PostgresOperator
op = PostgresOperator(
task_id="my_task",
sql=sql,
runtime_parameters={'set_client_encoding': 'UNICODE'},
)
此功能已添加到 PR 中,可用于 apache-airflow-providers-postgres>=4.1.0
我想将 postgres table 读取到数据工程管道的数据框中。我正在使用 Airflow 来安排这些任务。我在 Airflow 中创建了一个名为 postgres_product_db 的连接,并尝试使用 get_pandas_df
来获取记录。
db_hook = PostgresHook('postgres_product_db')
fetch_item = db_hook.get_pandas_df(request)
但它抛出错误
UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)
完整的错误日志:
[2022-04-05, 06:46:27 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 188, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/azureuser/airflow/dags/foodstar_store1_pricing_update_program.py", line 81, in fetch_inventory
inventory = db_hook.get_pandas_df(request)
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 138, in get_pandas_df
return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 566, in read_sql
return pandas_sql.read_query(
File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2094, in read_query
data = self._fetchall_as_list(cursor)
File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2108, in _fetchall_as_list
result = cur.fetchall()
UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)
在正常情况下,为了克服这个问题,我曾经设置编码
conn=psycopg2.connect(
database="xxxxx", user='xxxxx', password='xxxx', host='xxxx.xxx.xxx.xx.xxx', port='5432')
conn.set_client_encoding('UNICODE')
cur=conn.cursor()
但无法在 PostgresHook 中找到任何设置 client_encoding 的选项。连接中有一个 Extra 选项,我尝试将其设置为 { encode: 'UNICODE' }
。但这也会引发错误。有人可以帮忙吗?
client_encoding
是 run time config。
这意味着您应该将其嵌入到您的 SQL 语句中:
db_hook = PostgresHook('postgres_product_db')
sql="SET client_encoding = 'UTF8'; SELECT col FROM my_table "
fetch_item = db_hook.get_pandas_df(sql=sql)
你没有问过,但是对于 PostgresOperator
适用的情况,用法可以是:
from airflow.providers.postgres.operators.postgres import PostgresOperator
op = PostgresOperator(
task_id="my_task",
sql=sql,
runtime_parameters={'set_client_encoding': 'UNICODE'},
)
此功能已添加到 PR 中,可用于 apache-airflow-providers-postgres>=4.1.0