如何将数据帧传递到气流任务的临时表中
How to pass dataframes into temp tables for airflow tasks
所以我有一段代码在 python 中针对我们开发的工作流具有多种功能。此工作流采用 CSV 并将其通过数据帧传递。这个数据帧然后通过多个函数传递,这些函数对数据帧应用各种转换。
然而,当将此代码写入气流环境时,由于数据帧在虚拟环境中的工作方式以及跨多台机器的数据 运行,我将无法通过每个函数传递我的数据帧并且必须将它们存储在某个地方?
有谁知道如何在 bigquery 中设置一个临时 table 来为我的每个函数传递一个数据框,这样我就可以 运行 我的 ETL 全部使用我的气流任务?
如果您正在寻找从数据帧输入开始的 Airflow 任务,那么您使用错了。如果您想将脚本作为一个单元执行,您可以使用 PythonOperator
或 BashOperator
,但是如果您想将代码分解为多个任务,您可能需要进行一些重构。
要从 GCS
上的 csv
创建一个 BigQuery
外部 table,您可以将 GCSToBigQueryOperator
中的 external_table
设置为:
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
external_table=True,
)
我不知道您的工作流程中数据框的功能是什么(我假设它是对 csv
进行某种转换),因为您可以使用 GCSFileTransformOperator
(请参阅source code)。此运算符将数据从源 GCS 位置复制到本地文件系统上的临时位置。 运行s 对此文件的转换,由
转换脚本并将输出上传到目标存储桶。如果未指定输出桶,则原始文件将被覆盖。
所以您的工作流程可能是:
- 在 GCS 中归档土地
- 运行
GCSFileTransformOperator
处理和清理记录。
- 使用
GCSToBigQueryOperator
在 BigQuery 中创建一个 table
所以我有一段代码在 python 中针对我们开发的工作流具有多种功能。此工作流采用 CSV 并将其通过数据帧传递。这个数据帧然后通过多个函数传递,这些函数对数据帧应用各种转换。
然而,当将此代码写入气流环境时,由于数据帧在虚拟环境中的工作方式以及跨多台机器的数据 运行,我将无法通过每个函数传递我的数据帧并且必须将它们存储在某个地方?
有谁知道如何在 bigquery 中设置一个临时 table 来为我的每个函数传递一个数据框,这样我就可以 运行 我的 ETL 全部使用我的气流任务?
如果您正在寻找从数据帧输入开始的 Airflow 任务,那么您使用错了。如果您想将脚本作为一个单元执行,您可以使用 PythonOperator
或 BashOperator
,但是如果您想将代码分解为多个任务,您可能需要进行一些重构。
要从 GCS
上的 csv
创建一个 BigQuery
外部 table,您可以将 GCSToBigQueryOperator
中的 external_table
设置为:
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
external_table=True,
)
我不知道您的工作流程中数据框的功能是什么(我假设它是对 csv
进行某种转换),因为您可以使用 GCSFileTransformOperator
(请参阅source code)。此运算符将数据从源 GCS 位置复制到本地文件系统上的临时位置。 运行s 对此文件的转换,由
转换脚本并将输出上传到目标存储桶。如果未指定输出桶,则原始文件将被覆盖。
所以您的工作流程可能是:
- 在 GCS 中归档土地
- 运行
GCSFileTransformOperator
处理和清理记录。 - 使用
GCSToBigQueryOperator
在 BigQuery 中创建一个 table