访问修改后的气流变量作为 AIRFLOW 中 S3 传感器的自定义参数

Accessing modified airflow variable as a custom parameter for S3 sensor in AIRFLOW

让我看看我能不能解释一下这个问题。

看,我从客户那里收到的 S3 文件格式不正确。例如,日期以短破折号显示,如“2017_07_10”。

因为我想访问它们以便能够下载它们,所以首先我有一个任务是气流中的 S3 传感器。看起来是这样的:

xxx = S3KeySensor(
    task_id='task_name',
    bucket_key=BUCKET_KEY,
    wildcard_match=True,
    params={'yesterday_ds_formatted': ????},
    provide_context=True,
    bucket_name=BUCKET_NAME,
    s3_conn_id=S3_CONN_ID,
    timeout=18 * 60 * 60,
    poke_interval=120,
    dag=dag)

在 Airflow 控制台的变量部分,我有 bucket_key 和模板变量 {{yesterday_ds_formatted}}。

例如: 'folder1/folder2/folder3/blablablablabla-{{params.yesterday_ds_formatted}}*.csv

我需要修改该模板变量或其他内容,以便它获取 {{yesterday_ds}} 并将“-”替换为“_”。

伙计们,我该怎么做?我无法让它工作...... 我尝试在设置参数时调用自定义 python 函数,但后来我无法访问 "ds",甚至无法使用 kwargs。可以这么说,我似乎无法事先访问模板变量。

谢谢!!

如果我没理解错的话,您想对 bucket_key 参数使用 jinja 模板,但 S3KeySensor 不支持。

一个简单的方法是从 S3KeySensor 继承自定义传感器,如下所示:

TemplatedS3KeySensor(S3KeySensor):
  template_fields = ('bucket_key',)