为什么 Airflow/Composer 宏不是 interpolated/Interpreted?
Why are Airflow/Composer Macros are not interpolated/Interpreted?
我不确定这是否是正常行为,但我尝试 运行 在 BranhPythonOperator
中的 python_callable
中使用 gsutil
的命令...当我在 GCS
中使用硬编码路径在我的终端上显式使用它时,命令运行良好,但是一旦我尝试使用 {{ds_nodash}}
和 {{run_id}}
在我的 DAG 中 运行 它(气流宏)。正如您在下面的日志中所见,Airflow 不会解释它们。
这是我的 DAG 定义中的代码
with DAG("DAG_NAME", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
# Buckets
airflow_bucket = "XXXXX" # Hidden on purpose
archive_bucket = "YYYYY" # Hidden on purpose
# Paths
raw_data_path = "raw_data/tc_export/raw/{{ds_nodash}}/{{run_id}}/*"
airflow_local_dir = "/home/airflow/gcs/data/tc_data/"
# SFTP & dirs
sftp_key = "KEY" # Hidden on purpose
sftp_remote_directory_root = '/data/from_tc/'
op_check_if_files_in_sftp = BranchPythonOperator(
task_id='check_if_files_in_sftp',
provide_context=True,
python_callable=check_if_files_in_sftp,
op_kwargs={'remote_directory_root': sftp_remote_directory_root},
templates_dict={"sftp_key": sftp_key})
op_check_if_files_in_bucket = BranchPythonOperator(
task_id='check_if_files_in_bucket',
provide_context=True,
python_callable=check_if_files_in_bucket,
op_kwargs={'bucket': archive_bucket, 'subdir': raw_data_path})
这里是执行 gsutil
的函数
def check_if_files_in_bucket(bucket: str, subdir: str, **kwargs) -> str:
"""
Check if files already exist in the archives' bucket.
:param bucket: bucket in which to search
:param subdir: directory within the bucket
:param kwargs: additional context parameters.
:return: id of the next DAG operator
"""
try:
logging.info(f"Executing command : gsutil -q stat gs://{bucket}/{subdir}")
command = subprocess.run(["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"])
if command.returncode:
logging.info(f"Command return code : {command.returncode}. Ending process.")
return "end_process"
logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
return "transfer_to_other_bucket"
except OSError as os_err:
logging.exception(os_err)
exit(1)
except ValueError as val_err:
logging.exception(val_err)
exit(1)
所以我的问题是:
- Airflow 何时解释宏?
- 我该如何解决这个问题?
此处的问题与未在 BranchPythonOperator
中使用参数 templates_dict
有关。这是更正后的代码:
op_check_if_files_in_bucket = BranchPythonOperator(task_id='check_if_files_in_bucket',
provide_context=True,
python_callable=check_if_files_in_bucket,
op_kwargs={'bucket': archive_bucket},
templates_dict={'subdir': raw_data_path})
和 python_callable
函数:
def check_if_files_in_bucket(bucket: str, **kwargs) -> None:
"""
Check if files already exist in the archives' bucket.
:param bucket: bucket in which to search
:param kwargs: additional context parameters, and subdirectory in bucket.
:return: None
"""
try:
subdir = kwargs["templates_dict"]["subdir"]
cmd_check_files = ["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"]
logging.info(f"Executing command : {' '.join(cmd_check_files)}")
command = subprocess.run(cmd_check_files)
if command.returncode:
logging.info(f"Command return code : {command.returncode}. Ending process.")
return "end_process"
logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
return "transfer_to_other_bucket"
except OSError as os_err:
logging.exception(os_err)
exit(1)
except ValueError as val_err:
logging.exception(val_err)
exit(1)
N.B :由于 BranchPythonOperator
扩展了 PythonOperator
,同样的规则适用。
我不确定这是否是正常行为,但我尝试 运行 在 BranhPythonOperator
中的 python_callable
中使用 gsutil
的命令...当我在 GCS
中使用硬编码路径在我的终端上显式使用它时,命令运行良好,但是一旦我尝试使用 {{ds_nodash}}
和 {{run_id}}
在我的 DAG 中 运行 它(气流宏)。正如您在下面的日志中所见,Airflow 不会解释它们。
这是我的 DAG 定义中的代码
with DAG("DAG_NAME", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
# Buckets
airflow_bucket = "XXXXX" # Hidden on purpose
archive_bucket = "YYYYY" # Hidden on purpose
# Paths
raw_data_path = "raw_data/tc_export/raw/{{ds_nodash}}/{{run_id}}/*"
airflow_local_dir = "/home/airflow/gcs/data/tc_data/"
# SFTP & dirs
sftp_key = "KEY" # Hidden on purpose
sftp_remote_directory_root = '/data/from_tc/'
op_check_if_files_in_sftp = BranchPythonOperator(
task_id='check_if_files_in_sftp',
provide_context=True,
python_callable=check_if_files_in_sftp,
op_kwargs={'remote_directory_root': sftp_remote_directory_root},
templates_dict={"sftp_key": sftp_key})
op_check_if_files_in_bucket = BranchPythonOperator(
task_id='check_if_files_in_bucket',
provide_context=True,
python_callable=check_if_files_in_bucket,
op_kwargs={'bucket': archive_bucket, 'subdir': raw_data_path})
这里是执行 gsutil
def check_if_files_in_bucket(bucket: str, subdir: str, **kwargs) -> str:
"""
Check if files already exist in the archives' bucket.
:param bucket: bucket in which to search
:param subdir: directory within the bucket
:param kwargs: additional context parameters.
:return: id of the next DAG operator
"""
try:
logging.info(f"Executing command : gsutil -q stat gs://{bucket}/{subdir}")
command = subprocess.run(["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"])
if command.returncode:
logging.info(f"Command return code : {command.returncode}. Ending process.")
return "end_process"
logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
return "transfer_to_other_bucket"
except OSError as os_err:
logging.exception(os_err)
exit(1)
except ValueError as val_err:
logging.exception(val_err)
exit(1)
所以我的问题是:
- Airflow 何时解释宏?
- 我该如何解决这个问题?
此处的问题与未在 BranchPythonOperator
中使用参数 templates_dict
有关。这是更正后的代码:
op_check_if_files_in_bucket = BranchPythonOperator(task_id='check_if_files_in_bucket',
provide_context=True,
python_callable=check_if_files_in_bucket,
op_kwargs={'bucket': archive_bucket},
templates_dict={'subdir': raw_data_path})
和 python_callable
函数:
def check_if_files_in_bucket(bucket: str, **kwargs) -> None:
"""
Check if files already exist in the archives' bucket.
:param bucket: bucket in which to search
:param kwargs: additional context parameters, and subdirectory in bucket.
:return: None
"""
try:
subdir = kwargs["templates_dict"]["subdir"]
cmd_check_files = ["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"]
logging.info(f"Executing command : {' '.join(cmd_check_files)}")
command = subprocess.run(cmd_check_files)
if command.returncode:
logging.info(f"Command return code : {command.returncode}. Ending process.")
return "end_process"
logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
return "transfer_to_other_bucket"
except OSError as os_err:
logging.exception(os_err)
exit(1)
except ValueError as val_err:
logging.exception(val_err)
exit(1)
N.B :由于 BranchPythonOperator
扩展了 PythonOperator
,同样的规则适用。