如何在 Airflow 的每个循环中获取统一的当前日期时间?
How to get the uniform current date time throughout each loop in Airflow?
以下是使用的DAG格式,
我在 DAG 中使用 for 循环输入文件并使用
重命名输出文件
CurrentDateTime = datetime.datetime.today().strftime("%Y%m%d%H%M%S")
#if currentDateTIme = 20210406010203
outputfile = ''
outputfile1 = ''
outputfile2 = ''
inputfiles = ['input1', 'input2']
API_CALL_TASK1 = {
source : inputfile1
filename : outputfile1 #20210406010513
}
API_CALL_TASK2 = {
source : inputfile2
filename : outputfile2
}
for file in inputfiles:
if file == 'input1'
outputfile1 = f'inputFileName_{CurrentDateTime}' #20210406010303
outputfile = f'inputFileName_{CurrentDateTime}' #20210406010303
if file == 'input2'
outputfile2 = f'inputFileName_{CurrentDateTime}'
outputfile = f'inputFileName_{CurrentDateTime}'
MOVE_OUTPUT_TO_BUCKET_TASK = (
filename = f{outputfile} #20210406010423
)
MOVE_OUTPUT_TO_BUCKET_TASK >> API_CALL_TASK1 >> API_CALL_TASK1
此处的任务 - API_CALL_TASK1
、API_CALL_TASK2
和 MOVE_OUTPUT_TO_BUCKET_TASK
文件名中的日期时间不同,因为触发每个任务的时间不同。
如何获取每个文件的统一日期时间?
我想将相同的文件名从循环传递到 MOVE_OUTPUT_TO_BUCKET_TASK
和 API_CALL_TASK1
或 API_CALL_TASK2
据我了解,您想为循环中的两个输出传递格式为 YYYYMMDD
的相同日期时间。
Airflow 为我们提供了一组默认变量,可以跨所有模板使用。因此,您可以使用它在整个模板中传递一个常量值。在您的情况下,我看到您希望执行日期作为输出的后缀。因此,您应该根据 documentation 使用 ds_nodash
,它会检索 the execution date as YYYYMMDD
.
如果您使用带有 **kwargs 的 Python 运算符,您可以通过 kwargs['ds_nodash']
.
访问它
以下是使用的DAG格式, 我在 DAG 中使用 for 循环输入文件并使用
重命名输出文件CurrentDateTime = datetime.datetime.today().strftime("%Y%m%d%H%M%S")
#if currentDateTIme = 20210406010203
outputfile = ''
outputfile1 = ''
outputfile2 = ''
inputfiles = ['input1', 'input2']
API_CALL_TASK1 = {
source : inputfile1
filename : outputfile1 #20210406010513
}
API_CALL_TASK2 = {
source : inputfile2
filename : outputfile2
}
for file in inputfiles:
if file == 'input1'
outputfile1 = f'inputFileName_{CurrentDateTime}' #20210406010303
outputfile = f'inputFileName_{CurrentDateTime}' #20210406010303
if file == 'input2'
outputfile2 = f'inputFileName_{CurrentDateTime}'
outputfile = f'inputFileName_{CurrentDateTime}'
MOVE_OUTPUT_TO_BUCKET_TASK = (
filename = f{outputfile} #20210406010423
)
MOVE_OUTPUT_TO_BUCKET_TASK >> API_CALL_TASK1 >> API_CALL_TASK1
此处的任务 - API_CALL_TASK1
、API_CALL_TASK2
和 MOVE_OUTPUT_TO_BUCKET_TASK
文件名中的日期时间不同,因为触发每个任务的时间不同。
如何获取每个文件的统一日期时间?
我想将相同的文件名从循环传递到 MOVE_OUTPUT_TO_BUCKET_TASK
和 API_CALL_TASK1
或 API_CALL_TASK2
据我了解,您想为循环中的两个输出传递格式为 YYYYMMDD
的相同日期时间。
Airflow 为我们提供了一组默认变量,可以跨所有模板使用。因此,您可以使用它在整个模板中传递一个常量值。在您的情况下,我看到您希望执行日期作为输出的后缀。因此,您应该根据 documentation 使用 ds_nodash
,它会检索 the execution date as YYYYMMDD
.
如果您使用带有 **kwargs 的 Python 运算符,您可以通过 kwargs['ds_nodash']
.