Airflow DAG - 如何先检查 BQ(必要时删除)然后 运行 数据流作业?

Airflow DAG - how to check BQ first (delete if necessary) and then run dataflow job?

我正在使用 cloud composer 为到达 GCS 并转到 BigQuery 的文件编排 ETL。我有一个云函数,它在文件到达时触发 dag,并且云函数将文件 name/location 传递给 DAG。在我的 DAG 中,我有 2 个任务:

1)使用DataflowPythonOperator到运行一个数据流作业,从GCS中的文本中读取数据并将其转换并输入到BQ中, 2) 根据作业是失败还是成功将文件移动到 failure/success 存储桶。 每个文件都有一个文件 ID,它是 bigquery table 中的一列。有时一个文件会被编辑一两次(这不是经常发生的流式传输),我希望能够先删除该文件的现有记录。

我查看了其他气流运算符,但想在 运行 数据流作业之前在我的 DAG 中执行 2 个任务:

  1. 根据文件名获取文件ID(现在我有一个bigquerytable映射文件名->文件ID,但我也可以只带入一个 json 作为地图,我想这是否更容易)
  2. 如果文件ID已经存在于bigquerytable(从数据流作业输出转换数据的table),删除它,然后运行 数据流作业 所以我有最新的信息。我知道一个选择是只添加一个时间戳并且只使用最新的记录,但是因为每个文件可能有 100 万条记录而且它不像我每天删除 100 个文件(可能是 1-2 个顶部)看起来它可能是混乱和混乱的。

在数据流作业之后,最好是在将文件移动到 success/failure 文件夹之前,我想附加一些“记录” table 说明这个游戏是在这个时候输入的。这将是我查看发生的所有插入的方式。 我试图寻找不同的方法来做到这一点,我是 cloud composer 的新手,所以在经过 10 多个小时的研究后,我并不清楚这将如何工作,否则我会 post 代码输入。

谢谢,我非常感谢大家的帮助,如果这不是你想要的那么清楚,我深表歉意,关于气流的文档非常强大,但考虑到云作曲家和 bigquery 相对较新,很难学习彻底了解如何完成一些 GCP 特定任务。

听起来有点复杂。令人高兴的是,几乎所有 GCP 服务都有运营商。另一件事是什么时候触发 DAG 执行。你想出来了吗?每次有新文件进入该 GCS 存储桶时,您都希望触发 Google 云函数 运行。

  1. 正在触发您的 DAG

要触发 DAG,您需要使用依赖于 Object Finalize or Metadata Update 触发器的 Google 云函数来调用它。

  1. 正在将数据加载到 BigQuery

如果您的文件已经在 GCS 中,并且是 JSON 或 CSV 格式,那么使用 Dataflow 作业就有点过分了。您可以使用 GoogleCloudStorageToBigQueryOperator 将文件加载到 BQ。

  1. 跟踪文件 ID

可能计算文件 ID 的最佳方法是使用 Airflow 中的 Bash 或 Python 运算符。能直接从文件名推导出来吗?

如果是这样,那么您可以在 GoogleCloudStorageObjectSensor 的上游设置一个 Python 运算符来检查文件是否在成功的目录中。

如果是,那么您可以使用BigQueryOperator到运行BQ上的删除查询。

之后,您 运行 GoogleCloudStorageToBigQueryOperator。

  1. 四处移动文件

如果您要将文件从 GCS 移动到 GCS 位置,那么 GoogleCloudStorageToGoogleCloudStorageOperator 应该可以满足您的需要。如果您的 BQ 加载运算符失败,则移动到失败的文件位置,如果成功则移动到成功的作业位置。

  1. 记录任务日志

也许您需要跟踪插入的所有内容就是将任务信息记录到 GCS。查看 how to log task information to GCS

有帮助吗?