Airflow GCSFileTransformOperator 源对象文件名通配符

Airflow GCSFileTransformOperator source object filename wildcard

我正在开发一个 DAG,它应该读取 xml 文件,对其进行一些转换并将结果作为 CSV 格式。为此,我使用 GCSFileTransformOperator.

示例:

    xml_to_csv = GCSFileTransformOperator(
    task_id=f'xml_to_csv',
        source_bucket='source_bucket',
        source_object=(
            f'raw/dt=2022-01-19/File_20220119_4302.xml'
        ),
        destination_bucket='destination_bucket',
        destination_object=f'csv_format/dt=2022-01-19/File_20220119_4302.csv',
        transform_script=[
            '/path_to_script/transform_script.py'
        ],
)

我的问题是文件名以每天不同的 4 位数字结尾 (File_20220119_4302)。第二天的数字会有所不同。 我可以使用执行日期模板:{{ ds }}{{ ds_nodash }},但不确定如何处理数字。 我试过像 File_20220119_*.xml 这样的通配符,但没有成功。

我在操作员 GCSFileTransformOperator code and I dont think current wildcards will likely work as the current templates are fixed values based on the time of execution as described on templates reference 页面上挖掘,源文件将有一个完全不同的文件名。

我的解决方案是使用 python 运算符作为附加步骤,它可以首先找到您的输入文件。根据您的气流版本,您可以使用 TASKFLOW API or XCOM 来传递文件名数据。

def look_file(*args, **kwargs):
    # look for file
    return {'file_found': filefounpath}

file_found = PythonOperator(
    task_id='file_searcher',
    python_callable=look_file,
    dag=dag,
)

xml_to_csv = GCSFileTransformOperator(
    task_id=f'xml_to_csv',
        source_bucket='source_bucket',
        source_object=(
            raw/dt=file_found
        ),
        destination_bucket='destination_bucket',
        destination_object=f'csv_format/dt=2022-01-19/File_20220119_4302.csv',
        transform_script=[
            '/path_to_script/transform_script.py'
        ],
)