_pickle.PicklingError 尝试从 BigQuery 读取 table 并使用 Airflow 将其保存为数据框时
_pickle.PicklingError when trying to read a table from BigQuery and save it as a dataframe using Airflow
从 Airflow 读取 BigQuery table 并将结果保存为数据帧的最佳方法是什么?
我有一个 python 模块来读取 table 并且可以 运行 在本地获取 table:
def reading(ds, **kwargs):
project_id = '{project-id}'
credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
bqclient = bq.Client(credentials= credentials,project=project_id)
query_string = bqclient.query("""
SELECT *
FROM `{project-id}.{schema-name}.{table-name}`""")
dataframe = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
)
)
print(dataframe.head())
但是,当我尝试在 Airflow DAG 中调用模块时,出现错误:
_pickle.PicklingError:明确不支持 Pickling 客户端对象。
客户端具有非平凡的状态,它是本地的且不可篡改的。
这是完整的错误堆栈:
*** Reading local file: /usr/local/airflow/logs/test_BQ_read/BQ_reading/2021-09-22T09:16:02.147166+00:00/1.log
[2021-09-22 09:16:05,401] {taskinstance.py:897} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {taskinstance.py:897} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {taskinstance.py:1088} INFO -
--------------------------------------------------------------------------------
[2021-09-22 09:16:05,434] {taskinstance.py:1089} INFO - Starting attempt 1 of 1
[2021-09-22 09:16:05,434] {taskinstance.py:1090} INFO -
--------------------------------------------------------------------------------
[2021-09-22 09:16:05,454] {taskinstance.py:1108} INFO - Executing <Task(PythonOperator): BQ_reading> on 2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:05,469] {standard_task_runner.py:52} INFO - Started process 37080 to run task
[2021-09-22 09:16:05,492] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'test_BQ_read', 'BQ_reading', '2021-09-22T09:16:02.147166+00:00', '--job-id', '538', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test_BQ_read.py', '--cfg-path', '/tmp/tmp3m9gbm14', '--error-file', '/tmp/tmps5fthy88']
[2021-09-22 09:16:05,494] {standard_task_runner.py:77} INFO - Job 538: Subtask BQ_reading
[2021-09-22 09:16:05,606] {logging_mixin.py:104} INFO - Running <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [running]> on host ffce3e5ffe75
[2021-09-22 09:16:05,737] {taskinstance.py:1303} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_BQ_read
AIRFLOW_CTX_TASK_ID=BQ_reading
AIRFLOW_CTX_EXECUTION_DATE=2021-09-22T09:16:02.147166+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:06,165] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1362, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/test_BQ_read.py", line 43, in reading
result = bqclient.query(query_string)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3223, in query
query_job._begin(retry=retry, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 1138, in _begin
super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", line 468, in _begin
data=self.to_api_repr(),
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 819, in to_api_repr
configuration = self._configuration.to_api_repr()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 605, in to_api_repr
resource = copy.deepcopy(self._properties)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.7/copy.py", line 281, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 167, in __getstate__
"Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
[2021-09-22 09:16:06,172] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=test_BQ_read, task_id=BQ_reading, execution_date=20210922T091602, start_date=20210922T091605, end_date=20210922T091606
[2021-09-22 09:16:06,368] {local_task_job.py:153} INFO - Task exited with return code 1
关于如何正确阅读 table 有什么建议吗?
我认为您需要显示您正在使用的 DAG - 但我猜您在您的操作员中尝试在您的操作员的 init 中实例化 google 客户端(可能您创建 Hook 并将其分配给那里的某个字段)。这是完全错误的——钩子应该只在运算符的“执行”方法中实例化,而不应该在构造时存储在运算符对象中。
查看任何 Google 运算符:https://github.com/apache/airflow/tree/main/airflow/providers/google/cloud/operators
这就是通过更改 python 模块对我有效的方法
from google.cloud import bigquery
from google.oauth2 import service_account
import google.auth
from datetime import datetime, timedelta
from airflow import DAG
from airflow import models
from airflow.models import Variable
import pandas as pd
from airflow.operators.python_operator import PythonOperator
def reading(ds, **kwargs):
credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
project_id = '{project-id}'
bqclient = bigquery.Client(credentials= credentials,project=project_id)
# Download a table.
table = bigquery.TableReference.from_string(
""project-id}.{schema_name}.{table_name}"
)
rows = bqclient.list_rows(
table,
)
dataframe = rows.to_dataframe(
create_bqstorage_client=True,
)
print(dataframe.head())
with models.DAG(
'test_BQ_read',
schedule_interval=None,
start_date='2021-09-22',
tags=["example"],
catchup=False
) as dag:
BQ_reading = PythonOperator(
task_id='BQ_reading',
python_callable=reading,
)
可能与 pandas 和 BigQuery 导致泡菜错误有关。
从 Airflow 读取 BigQuery table 并将结果保存为数据帧的最佳方法是什么? 我有一个 python 模块来读取 table 并且可以 运行 在本地获取 table:
def reading(ds, **kwargs):
project_id = '{project-id}'
credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
bqclient = bq.Client(credentials= credentials,project=project_id)
query_string = bqclient.query("""
SELECT *
FROM `{project-id}.{schema-name}.{table-name}`""")
dataframe = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
)
)
print(dataframe.head())
但是,当我尝试在 Airflow DAG 中调用模块时,出现错误:
_pickle.PicklingError:明确不支持 Pickling 客户端对象。 客户端具有非平凡的状态,它是本地的且不可篡改的。
这是完整的错误堆栈:
*** Reading local file: /usr/local/airflow/logs/test_BQ_read/BQ_reading/2021-09-22T09:16:02.147166+00:00/1.log
[2021-09-22 09:16:05,401] {taskinstance.py:897} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {taskinstance.py:897} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {taskinstance.py:1088} INFO -
--------------------------------------------------------------------------------
[2021-09-22 09:16:05,434] {taskinstance.py:1089} INFO - Starting attempt 1 of 1
[2021-09-22 09:16:05,434] {taskinstance.py:1090} INFO -
--------------------------------------------------------------------------------
[2021-09-22 09:16:05,454] {taskinstance.py:1108} INFO - Executing <Task(PythonOperator): BQ_reading> on 2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:05,469] {standard_task_runner.py:52} INFO - Started process 37080 to run task
[2021-09-22 09:16:05,492] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'test_BQ_read', 'BQ_reading', '2021-09-22T09:16:02.147166+00:00', '--job-id', '538', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test_BQ_read.py', '--cfg-path', '/tmp/tmp3m9gbm14', '--error-file', '/tmp/tmps5fthy88']
[2021-09-22 09:16:05,494] {standard_task_runner.py:77} INFO - Job 538: Subtask BQ_reading
[2021-09-22 09:16:05,606] {logging_mixin.py:104} INFO - Running <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [running]> on host ffce3e5ffe75
[2021-09-22 09:16:05,737] {taskinstance.py:1303} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_BQ_read
AIRFLOW_CTX_TASK_ID=BQ_reading
AIRFLOW_CTX_EXECUTION_DATE=2021-09-22T09:16:02.147166+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:06,165] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1362, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/test_BQ_read.py", line 43, in reading
result = bqclient.query(query_string)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3223, in query
query_job._begin(retry=retry, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 1138, in _begin
super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", line 468, in _begin
data=self.to_api_repr(),
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 819, in to_api_repr
configuration = self._configuration.to_api_repr()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 605, in to_api_repr
resource = copy.deepcopy(self._properties)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.7/copy.py", line 281, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 167, in __getstate__
"Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
[2021-09-22 09:16:06,172] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=test_BQ_read, task_id=BQ_reading, execution_date=20210922T091602, start_date=20210922T091605, end_date=20210922T091606
[2021-09-22 09:16:06,368] {local_task_job.py:153} INFO - Task exited with return code 1
关于如何正确阅读 table 有什么建议吗?
我认为您需要显示您正在使用的 DAG - 但我猜您在您的操作员中尝试在您的操作员的 init 中实例化 google 客户端(可能您创建 Hook 并将其分配给那里的某个字段)。这是完全错误的——钩子应该只在运算符的“执行”方法中实例化,而不应该在构造时存储在运算符对象中。
查看任何 Google 运算符:https://github.com/apache/airflow/tree/main/airflow/providers/google/cloud/operators
这就是通过更改 python 模块对我有效的方法
from google.cloud import bigquery
from google.oauth2 import service_account
import google.auth
from datetime import datetime, timedelta
from airflow import DAG
from airflow import models
from airflow.models import Variable
import pandas as pd
from airflow.operators.python_operator import PythonOperator
def reading(ds, **kwargs):
credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
project_id = '{project-id}'
bqclient = bigquery.Client(credentials= credentials,project=project_id)
# Download a table.
table = bigquery.TableReference.from_string(
""project-id}.{schema_name}.{table_name}"
)
rows = bqclient.list_rows(
table,
)
dataframe = rows.to_dataframe(
create_bqstorage_client=True,
)
print(dataframe.head())
with models.DAG(
'test_BQ_read',
schedule_interval=None,
start_date='2021-09-22',
tags=["example"],
catchup=False
) as dag:
BQ_reading = PythonOperator(
task_id='BQ_reading',
python_callable=reading,
)
可能与 pandas 和 BigQuery 导致泡菜错误有关。