为什么 Airflow/Composer 宏不是 interpolated/Interpreted?

Why are Airflow/Composer Macros are not interpolated/Interpreted?

我不确定这是否是正常行为,但我尝试 运行 在 BranhPythonOperator 中的 python_callable 中使用 gsutil 的命令...当我在 GCS 中使用硬编码路径在我的终端上显式使用它时,命令运行良好,但是一旦我尝试使用 {{ds_nodash}}{{run_id}} 在我的 DAG 中 运行 它(气流宏)。正如您在下面的日志中所见,Airflow 不会解释它们。

这是我的 DAG 定义中的代码

with DAG("DAG_NAME", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
    # Buckets
    airflow_bucket = "XXXXX"  # Hidden on purpose
    archive_bucket = "YYYYY"  # Hidden on purpose

    # Paths
    raw_data_path = "raw_data/tc_export/raw/{{ds_nodash}}/{{run_id}}/*"
    airflow_local_dir = "/home/airflow/gcs/data/tc_data/"

    # SFTP & dirs
    sftp_key = "KEY"  # Hidden on purpose
    sftp_remote_directory_root = '/data/from_tc/'

    op_check_if_files_in_sftp = BranchPythonOperator(
        task_id='check_if_files_in_sftp',
        provide_context=True,
        python_callable=check_if_files_in_sftp,
        op_kwargs={'remote_directory_root': sftp_remote_directory_root},
        templates_dict={"sftp_key": sftp_key})
    op_check_if_files_in_bucket = BranchPythonOperator(
        task_id='check_if_files_in_bucket',
        provide_context=True,
        python_callable=check_if_files_in_bucket,
        op_kwargs={'bucket': archive_bucket, 'subdir': raw_data_path})

这里是执行 gsutil

的函数
def check_if_files_in_bucket(bucket: str, subdir: str, **kwargs) -> str:
    """
    Check if files already exist in the archives' bucket.

    :param bucket: bucket in which to search
    :param subdir: directory within the bucket
    :param kwargs: additional context parameters.
    :return: id of the next DAG operator
    """
    try:
        logging.info(f"Executing command : gsutil -q  stat gs://{bucket}/{subdir}")
        command = subprocess.run(["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"])
        if command.returncode:
            logging.info(f"Command return code : {command.returncode}. Ending process.")
            return "end_process"
        logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
        return "transfer_to_other_bucket"
    except OSError as os_err:
        logging.exception(os_err)
        exit(1)
    except ValueError as val_err:
        logging.exception(val_err)
        exit(1)

所以我的问题是:

  1. Airflow 何时解释宏?
  2. 我该如何解决这个问题?

此处的问题与未在 BranchPythonOperator 中使用参数 templates_dict 有关。这是更正后的代码:

op_check_if_files_in_bucket = BranchPythonOperator(task_id='check_if_files_in_bucket',
                                             provide_context=True,
                                             python_callable=check_if_files_in_bucket,
                                             op_kwargs={'bucket': archive_bucket},
                                             templates_dict={'subdir': raw_data_path})

python_callable 函数:

def check_if_files_in_bucket(bucket: str, **kwargs) -> None:
    """
    Check if files already exist in the archives' bucket.

    :param bucket: bucket in which to search
    :param kwargs: additional context parameters, and subdirectory in bucket.
    :return: None
    """
    try:
        subdir = kwargs["templates_dict"]["subdir"]
        cmd_check_files = ["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"]

        logging.info(f"Executing command : {' '.join(cmd_check_files)}")
        command = subprocess.run(cmd_check_files)
        if command.returncode:
            logging.info(f"Command return code : {command.returncode}. Ending process.")
            return "end_process"
        logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
        return "transfer_to_other_bucket"
    except OSError as os_err:
        logging.exception(os_err)
        exit(1)
    except ValueError as val_err:
        logging.exception(val_err)
        exit(1)

N.B :由于 BranchPythonOperator 扩展了 PythonOperator,同样的规则适用。