如何使用 Cloud Composer/Apache Airflow 运行 带有设置文件的 Dataflow 管道?

How can I run a Dataflow pipeline with a setup file using Cloud Composer/Apache Airflow?

我有一个可用的数据流管道,第一个 运行s setup.py 用于安装一些本地帮助程序模块。我现在想使用 Cloud Composer/Apache Airflow 来安排管道。我已经创建了我的 DAG 文件并将其与我的管道项目一起放在指定的 Google Storage DAG 文件夹中。文件夹结构如下所示:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py

我的 DAG 中指定 setup.py 文件的部分如下所示:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

但是,当我在 Airflow Web UI 中查看日志时,我收到错误消息:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

不知道为什么找不到安装文件。我如何 运行 我的数据流管道与设置 file/modules?

您 运行 Composer 和 Dataflow 使用相同的服务帐户,还是分开的?后一种情况,请问Dataflow的service account是否有bucket和object的读权限?

如果您查看 DataflowPythonOperator 的代码,看起来主 py_file 可以是 GCS 存储桶内的一个文件,并且在执行管道之前由操作员本地化。但是,对于 dataflow_default_options,我没有看到类似的东西。看起来选项只是简单地复制和格式化。

由于 GCS dag 文件夹是使用 Cloud Storage Fuse 安装在 Airflow 实例上的,您应该能够使用 "dags_folder" env var 在本地访问该文件。 也就是说,你可以这样做:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

然后您可以在 dataflow_default_options.

中为 setup_file 属性 使用 LOCAL_SETUP_FILE 变量