访问修改后的气流变量作为 AIRFLOW 中 S3 传感器的自定义参数
Accessing modified airflow variable as a custom parameter for S3 sensor in AIRFLOW
让我看看我能不能解释一下这个问题。
看,我从客户那里收到的 S3 文件格式不正确。例如,日期以短破折号显示,如“2017_07_10”。
因为我想访问它们以便能够下载它们,所以首先我有一个任务是气流中的 S3 传感器。看起来是这样的:
xxx = S3KeySensor(
task_id='task_name',
bucket_key=BUCKET_KEY,
wildcard_match=True,
params={'yesterday_ds_formatted': ????},
provide_context=True,
bucket_name=BUCKET_NAME,
s3_conn_id=S3_CONN_ID,
timeout=18 * 60 * 60,
poke_interval=120,
dag=dag)
在 Airflow 控制台的变量部分,我有 bucket_key 和模板变量 {{yesterday_ds_formatted}}。
例如: 'folder1/folder2/folder3/blablablablabla-{{params.yesterday_ds_formatted}}*.csv
我需要修改该模板变量或其他内容,以便它获取 {{yesterday_ds}} 并将“-”替换为“_”。
伙计们,我该怎么做?我无法让它工作......
我尝试在设置参数时调用自定义 python 函数,但后来我无法访问 "ds",甚至无法使用 kwargs。可以这么说,我似乎无法事先访问模板变量。
谢谢!!
如果我没理解错的话,您想对 bucket_key
参数使用 jinja 模板,但 S3KeySensor 不支持。
一个简单的方法是从 S3KeySensor
继承自定义传感器,如下所示:
TemplatedS3KeySensor(S3KeySensor):
template_fields = ('bucket_key',)
让我看看我能不能解释一下这个问题。
看,我从客户那里收到的 S3 文件格式不正确。例如,日期以短破折号显示,如“2017_07_10”。
因为我想访问它们以便能够下载它们,所以首先我有一个任务是气流中的 S3 传感器。看起来是这样的:
xxx = S3KeySensor(
task_id='task_name',
bucket_key=BUCKET_KEY,
wildcard_match=True,
params={'yesterday_ds_formatted': ????},
provide_context=True,
bucket_name=BUCKET_NAME,
s3_conn_id=S3_CONN_ID,
timeout=18 * 60 * 60,
poke_interval=120,
dag=dag)
在 Airflow 控制台的变量部分,我有 bucket_key 和模板变量 {{yesterday_ds_formatted}}。
例如: 'folder1/folder2/folder3/blablablablabla-{{params.yesterday_ds_formatted}}*.csv
我需要修改该模板变量或其他内容,以便它获取 {{yesterday_ds}} 并将“-”替换为“_”。
伙计们,我该怎么做?我无法让它工作...... 我尝试在设置参数时调用自定义 python 函数,但后来我无法访问 "ds",甚至无法使用 kwargs。可以这么说,我似乎无法事先访问模板变量。
谢谢!!
如果我没理解错的话,您想对 bucket_key
参数使用 jinja 模板,但 S3KeySensor 不支持。
一个简单的方法是从 S3KeySensor
继承自定义传感器,如下所示:
TemplatedS3KeySensor(S3KeySensor):
template_fields = ('bucket_key',)