如何在 Airflow 的每个循环中获取统一的当前日期时间?

How to get the uniform current date time throughout each loop in Airflow?

以下是使用的DAG格式, 我在 DAG 中使用 for 循环输入文件并使用

重命名输出文件
CurrentDateTime = datetime.datetime.today().strftime("%Y%m%d%H%M%S")

#if currentDateTIme = 20210406010203

outputfile = ''
outputfile1 = ''
outputfile2 = ''
inputfiles = ['input1', 'input2']

API_CALL_TASK1 = {
source : inputfile1
filename : outputfile1 #20210406010513
}

API_CALL_TASK2 = {
source : inputfile2
filename : outputfile2 
}

for file in inputfiles:
    if file == 'input1'
        outputfile1 = f'inputFileName_{CurrentDateTime}' #20210406010303
        outputfile = f'inputFileName_{CurrentDateTime}' #20210406010303
    if file == 'input2'
        outputfile2 = f'inputFileName_{CurrentDateTime}' 
        outputfile = f'inputFileName_{CurrentDateTime}'        
    MOVE_OUTPUT_TO_BUCKET_TASK = (
                                 filename = f{outputfile} #20210406010423
                                 )

MOVE_OUTPUT_TO_BUCKET_TASK >> API_CALL_TASK1 >> API_CALL_TASK1

此处的任务 - API_CALL_TASK1API_CALL_TASK2MOVE_OUTPUT_TO_BUCKET_TASK 文件名中的日期时间不同,因为触发每个任务的时间不同。

如何获取每个文件的统一日期时间?

我想将相同的文件名从循环传递到 MOVE_OUTPUT_TO_BUCKET_TASKAPI_CALL_TASK1API_CALL_TASK2

据我了解,您想为循环中的两个输出传递格式为 YYYYMMDD 的相同日期时间。

Airflow 为我们提供了一组默认变量,可以跨所有模板使用。因此,您可以使用它在整个模板中传递一个常量值。在您的情况下,我看到您希望执行日期作为输出的后缀。因此,您应该根据 documentation 使用 ds_nodash,它会检索 the execution date as YYYYMMDD.

如果您使用带有 **kwargs 的 Python 运算符,您可以通过 kwargs['ds_nodash'].

访问它