Airflow DAG - 如何先检查 BQ(必要时删除)然后 运行 数据流作业?
Airflow DAG - how to check BQ first (delete if necessary) and then run dataflow job?
我正在使用 cloud composer 为到达 GCS 并转到 BigQuery 的文件编排 ETL。我有一个云函数,它在文件到达时触发 dag,并且云函数将文件 name/location 传递给 DAG。在我的 DAG 中,我有 2 个任务:
1)使用DataflowPythonOperator
到运行一个数据流作业,从GCS中的文本中读取数据并将其转换并输入到BQ中, 2) 根据作业是失败还是成功将文件移动到 failure/success 存储桶。
每个文件都有一个文件 ID,它是 bigquery table 中的一列。有时一个文件会被编辑一两次(这不是经常发生的流式传输),我希望能够先删除该文件的现有记录。
我查看了其他气流运算符,但想在 运行 数据流作业之前在我的 DAG 中执行 2 个任务:
- 根据文件名获取文件ID(现在我有一个bigquerytable映射文件名->文件ID,但我也可以只带入一个 json 作为地图,我想这是否更容易)
- 如果文件ID已经存在于bigquerytable(从数据流作业输出转换数据的table),删除它,然后运行 数据流作业 所以我有最新的信息。我知道一个选择是只添加一个时间戳并且只使用最新的记录,但是因为每个文件可能有 100 万条记录而且它不像我每天删除 100 个文件(可能是 1-2 个顶部)看起来它可能是混乱和混乱的。
在数据流作业之后,最好是在将文件移动到 success/failure 文件夹之前,我想附加一些“记录” table 说明这个游戏是在这个时候输入的。这将是我查看发生的所有插入的方式。
我试图寻找不同的方法来做到这一点,我是 cloud composer 的新手,所以在经过 10 多个小时的研究后,我并不清楚这将如何工作,否则我会 post 代码输入。
谢谢,我非常感谢大家的帮助,如果这不是你想要的那么清楚,我深表歉意,关于气流的文档非常强大,但考虑到云作曲家和 bigquery 相对较新,很难学习彻底了解如何完成一些 GCP 特定任务。
听起来有点复杂。令人高兴的是,几乎所有 GCP 服务都有运营商。另一件事是什么时候触发 DAG 执行。你想出来了吗?每次有新文件进入该 GCS 存储桶时,您都希望触发 Google 云函数 运行。
- 正在触发您的 DAG
要触发 DAG,您需要使用依赖于 Object Finalize or Metadata Update 触发器的 Google 云函数来调用它。
- 正在将数据加载到 BigQuery
如果您的文件已经在 GCS 中,并且是 JSON 或 CSV 格式,那么使用 Dataflow 作业就有点过分了。您可以使用 GoogleCloudStorageToBigQueryOperator 将文件加载到 BQ。
- 跟踪文件 ID
可能计算文件 ID 的最佳方法是使用 Airflow 中的 Bash 或 Python 运算符。能直接从文件名推导出来吗?
如果是这样,那么您可以在 GoogleCloudStorageObjectSensor 的上游设置一个 Python 运算符来检查文件是否在成功的目录中。
如果是,那么您可以使用BigQueryOperator到运行BQ上的删除查询。
之后,您 运行 GoogleCloudStorageToBigQueryOperator。
- 四处移动文件
如果您要将文件从 GCS 移动到 GCS 位置,那么 GoogleCloudStorageToGoogleCloudStorageOperator 应该可以满足您的需要。如果您的 BQ 加载运算符失败,则移动到失败的文件位置,如果成功则移动到成功的作业位置。
- 记录任务日志
也许您需要跟踪插入的所有内容就是将任务信息记录到 GCS。查看 how to log task information to GCS
有帮助吗?
我正在使用 cloud composer 为到达 GCS 并转到 BigQuery 的文件编排 ETL。我有一个云函数,它在文件到达时触发 dag,并且云函数将文件 name/location 传递给 DAG。在我的 DAG 中,我有 2 个任务:
1)使用DataflowPythonOperator
到运行一个数据流作业,从GCS中的文本中读取数据并将其转换并输入到BQ中, 2) 根据作业是失败还是成功将文件移动到 failure/success 存储桶。
每个文件都有一个文件 ID,它是 bigquery table 中的一列。有时一个文件会被编辑一两次(这不是经常发生的流式传输),我希望能够先删除该文件的现有记录。
我查看了其他气流运算符,但想在 运行 数据流作业之前在我的 DAG 中执行 2 个任务:
- 根据文件名获取文件ID(现在我有一个bigquerytable映射文件名->文件ID,但我也可以只带入一个 json 作为地图,我想这是否更容易)
- 如果文件ID已经存在于bigquerytable(从数据流作业输出转换数据的table),删除它,然后运行 数据流作业 所以我有最新的信息。我知道一个选择是只添加一个时间戳并且只使用最新的记录,但是因为每个文件可能有 100 万条记录而且它不像我每天删除 100 个文件(可能是 1-2 个顶部)看起来它可能是混乱和混乱的。
在数据流作业之后,最好是在将文件移动到 success/failure 文件夹之前,我想附加一些“记录” table 说明这个游戏是在这个时候输入的。这将是我查看发生的所有插入的方式。 我试图寻找不同的方法来做到这一点,我是 cloud composer 的新手,所以在经过 10 多个小时的研究后,我并不清楚这将如何工作,否则我会 post 代码输入。
谢谢,我非常感谢大家的帮助,如果这不是你想要的那么清楚,我深表歉意,关于气流的文档非常强大,但考虑到云作曲家和 bigquery 相对较新,很难学习彻底了解如何完成一些 GCP 特定任务。
听起来有点复杂。令人高兴的是,几乎所有 GCP 服务都有运营商。另一件事是什么时候触发 DAG 执行。你想出来了吗?每次有新文件进入该 GCS 存储桶时,您都希望触发 Google 云函数 运行。
- 正在触发您的 DAG
要触发 DAG,您需要使用依赖于 Object Finalize or Metadata Update 触发器的 Google 云函数来调用它。
- 正在将数据加载到 BigQuery
如果您的文件已经在 GCS 中,并且是 JSON 或 CSV 格式,那么使用 Dataflow 作业就有点过分了。您可以使用 GoogleCloudStorageToBigQueryOperator 将文件加载到 BQ。
- 跟踪文件 ID
可能计算文件 ID 的最佳方法是使用 Airflow 中的 Bash 或 Python 运算符。能直接从文件名推导出来吗?
如果是这样,那么您可以在 GoogleCloudStorageObjectSensor 的上游设置一个 Python 运算符来检查文件是否在成功的目录中。
如果是,那么您可以使用BigQueryOperator到运行BQ上的删除查询。
之后,您 运行 GoogleCloudStorageToBigQueryOperator。
- 四处移动文件
如果您要将文件从 GCS 移动到 GCS 位置,那么 GoogleCloudStorageToGoogleCloudStorageOperator 应该可以满足您的需要。如果您的 BQ 加载运算符失败,则移动到失败的文件位置,如果成功则移动到成功的作业位置。
- 记录任务日志
也许您需要跟踪插入的所有内容就是将任务信息记录到 GCS。查看 how to log task information to GCS
有帮助吗?