是否可以使用 Cloud Data Fusion FTP -> GCS -> BQ

Is it possible to use Cloud Data Fusion FTP -> GCS -> BQ

我是 GCP 和 Cloud Data Fusion 的新手。我看到您可以使用此服务将跨数据源的数据集成到数据湖中。

我有许多 sftp 提供商提供不同结构化格式的文件,例如。 csv、json、parquet 和 avro

最终我希望这些数据在 BQ 中可用。

在加载到 BQ 之前,我的第一站是 Google 云存储,这样我就有了一个 immutable 数据副本。

sftp 站点将有多个文件代表多个 tables。

/root/table_1
/root/table_2
/root/table_3

我首先尝试查看是否使用 Cloud Data Fusion 管道将文件从 SFTP 复制到 GCS。这已被证明具有挑战性。

  1. 我可以为此使用 Fusion 吗?
  2. 我是否需要为每个文件提供架构,还是可以推断出来?
  3. 是否需要手动枚举每个 table?理想情况下,我想将所有文件原样从 SFTP 复制到 GCS
  4. 进入 GCS 后,我想在 BigQuery 中为每个文件创建一个外部数据源。这可能吗?

请考虑 Cloud Datafusion 是一个 ETL (Extract-Transform-Load) 工具;在这种情况下,管道将处理文件中的数据,而不是文件本身;因此,很难定义一个简单的管道来将文件上传到存储。

为了解决这个问题,我想到了使用 Javascript transform 来使用客户端库;但是,它不允许您导入依赖项;因此,您不能使用它们,因此服务的身份验证步骤可能会很复杂。

同样,由于文件格式不同,似乎必须根据文件类型定义对 separate/treat 文件的转换。

另一方面,我了解到您的使用场景是:

  1. 从一台或多台 SFTP 服务器提取文件
  2. 这些文件有不同的格式(csv、json、parquet 和 avro)
  3. 文件需要上传到云存储
  4. 每个存储文件都与作为外部源的 BigQuery table 相关联

基于此,我考虑使用像Cloud Composer这样的编排工具的更好选择。

Airflow 使用 DAG(有向无环图)作为您想要 运行 的所有任务的集合,其组织方式反映了它们之间的关系和依赖关系;在这种情况下,您的 DAG 将执行以下任务:

  1. 使用sftp_operator从工人内部的sftp服务器获取文件
  2. 一旦文件在 workers 中,您就可以在您的项目中使用 PythonOperator to use the Cloud Storage library to upload the files to a bucket
  3. 文件存储在存储中后,您可以使用 PythonOperator 来使用 BigQuery 库,或者使用 BashOperator 来使用 bq 加载 create the tables with each file as external source

另一个好处是您不再需要担心文件类型,因为您提到的所有格式都是 currently supported 可以直接从它们创建 table。