是否可以使用 Cloud Data Fusion FTP -> GCS -> BQ
Is it possible to use Cloud Data Fusion FTP -> GCS -> BQ
我是 GCP 和 Cloud Data Fusion 的新手。我看到您可以使用此服务将跨数据源的数据集成到数据湖中。
我有许多 sftp 提供商提供不同结构化格式的文件,例如。 csv、json、parquet 和 avro
最终我希望这些数据在 BQ 中可用。
在加载到 BQ 之前,我的第一站是 Google 云存储,这样我就有了一个 immutable 数据副本。
sftp 站点将有多个文件代表多个 tables。
/root/table_1
/root/table_2
/root/table_3
我首先尝试查看是否使用 Cloud Data Fusion 管道将文件从 SFTP 复制到 GCS。这已被证明具有挑战性。
- 我可以为此使用 Fusion 吗?
- 我是否需要为每个文件提供架构,还是可以推断出来?
- 是否需要手动枚举每个 table?理想情况下,我想将所有文件原样从 SFTP 复制到 GCS
- 进入 GCS 后,我想在 BigQuery 中为每个文件创建一个外部数据源。这可能吗?
请考虑 Cloud Datafusion 是一个 ETL (Extract-Transform-Load) 工具;在这种情况下,管道将处理文件中的数据,而不是文件本身;因此,很难定义一个简单的管道来将文件上传到存储。
为了解决这个问题,我想到了使用 Javascript transform 来使用客户端库;但是,它不允许您导入依赖项;因此,您不能使用它们,因此服务的身份验证步骤可能会很复杂。
同样,由于文件格式不同,似乎必须根据文件类型定义对 separate/treat 文件的转换。
另一方面,我了解到您的使用场景是:
- 从一台或多台 SFTP 服务器提取文件
- 这些文件有不同的格式(csv、json、parquet 和 avro)
- 文件需要上传到云存储
- 每个存储文件都与作为外部源的 BigQuery table 相关联
基于此,我考虑使用像Cloud Composer这样的编排工具的更好选择。
Airflow 使用 DAG(有向无环图)作为您想要 运行 的所有任务的集合,其组织方式反映了它们之间的关系和依赖关系;在这种情况下,您的 DAG 将执行以下任务:
- 使用sftp_operator从工人内部的sftp服务器获取文件
- 一旦文件在 workers 中,您就可以在您的项目中使用 PythonOperator to use the Cloud Storage library to upload the files to a bucket。
- 文件存储在存储中后,您可以使用 PythonOperator 来使用 BigQuery 库,或者使用 BashOperator 来使用 bq 加载 create the tables with each file as external source
另一个好处是您不再需要担心文件类型,因为您提到的所有格式都是 currently supported 可以直接从它们创建 table。
我是 GCP 和 Cloud Data Fusion 的新手。我看到您可以使用此服务将跨数据源的数据集成到数据湖中。
我有许多 sftp 提供商提供不同结构化格式的文件,例如。 csv、json、parquet 和 avro
最终我希望这些数据在 BQ 中可用。
在加载到 BQ 之前,我的第一站是 Google 云存储,这样我就有了一个 immutable 数据副本。
sftp 站点将有多个文件代表多个 tables。
/root/table_1
/root/table_2
/root/table_3
我首先尝试查看是否使用 Cloud Data Fusion 管道将文件从 SFTP 复制到 GCS。这已被证明具有挑战性。
- 我可以为此使用 Fusion 吗?
- 我是否需要为每个文件提供架构,还是可以推断出来?
- 是否需要手动枚举每个 table?理想情况下,我想将所有文件原样从 SFTP 复制到 GCS
- 进入 GCS 后,我想在 BigQuery 中为每个文件创建一个外部数据源。这可能吗?
请考虑 Cloud Datafusion 是一个 ETL (Extract-Transform-Load) 工具;在这种情况下,管道将处理文件中的数据,而不是文件本身;因此,很难定义一个简单的管道来将文件上传到存储。
为了解决这个问题,我想到了使用 Javascript transform 来使用客户端库;但是,它不允许您导入依赖项;因此,您不能使用它们,因此服务的身份验证步骤可能会很复杂。
同样,由于文件格式不同,似乎必须根据文件类型定义对 separate/treat 文件的转换。
另一方面,我了解到您的使用场景是:
- 从一台或多台 SFTP 服务器提取文件
- 这些文件有不同的格式(csv、json、parquet 和 avro)
- 文件需要上传到云存储
- 每个存储文件都与作为外部源的 BigQuery table 相关联
基于此,我考虑使用像Cloud Composer这样的编排工具的更好选择。
Airflow 使用 DAG(有向无环图)作为您想要 运行 的所有任务的集合,其组织方式反映了它们之间的关系和依赖关系;在这种情况下,您的 DAG 将执行以下任务:
- 使用sftp_operator从工人内部的sftp服务器获取文件
- 一旦文件在 workers 中,您就可以在您的项目中使用 PythonOperator to use the Cloud Storage library to upload the files to a bucket。
- 文件存储在存储中后,您可以使用 PythonOperator 来使用 BigQuery 库,或者使用 BashOperator 来使用 bq 加载 create the tables with each file as external source
另一个好处是您不再需要担心文件类型,因为您提到的所有格式都是 currently supported 可以直接从它们创建 table。