我可以在气流中使用 python def 吗?或者这不被认为是好的做法?

Can I use python def in airflow? or is this not considered good practice?

如果我需要加载多个表,我可以创建一个这样的 python 函数吗?或者这不是一个好的做法吗?

def bqLoad(task, table_id, fileList):
    load = GoogleCloudStorageToBigQueryOperator(
        task_id = task,
        bucket = "bucket_name",
        destination_project_dataset_table="{}.{}.{}".format("project_id","dataset_id",table_id),
        source_format="PARQUET",
        source_objects = fileList,
        create_disposition="CREATE_IF_NEEDED",
        write_disposition="WRITE_APPEND",
        dag=dag,
    )
    return load

这个方法很好,考虑到了DRY(减少软件重复的原则,大家可以看看here or in this book,很好)

此外,the Airflow documentation 概述了各种最佳实践,如果您有兴趣,可以通读一下。

我个人更喜欢通过扩展任何其他 Operator 来定义 Custom Operator,可能是这样的:

class BigQueryLoadOperator(GoogleCloudStorageToBigQueryOperator):
    """
    Custom Operator..
    """

    @apply_defaults
    def __init__(self,
                 table_id,
                 fileList,
                 *args, **kwargs) -> None:
        super(BigQueryLoadOperator, self).__init__(*args, **kwargs)
        self.bucket = "bucket_name",
        self.destination_project_dataset_table = "{}.{}.{}".format(
            "project_id", "dataset_id", table_id),
        self.source_format = "PARQUET",
        self.source_objects = fileList,
        self.create_disposition = "CREATE_IF_NEEDED",
        self.write_disposition = "WRITE_APPEND"
    
    def execute(self, context):
        # Implement here the expected behavior
        # Take advantage from attributes and methods
        # from inherited operator
        pass
        

像这样从你的 DAG 中使用它:

    load_to_bq_task = BigQueryLoadOperator(
        task_id='some_unique_id',
        table_id='table_unique_id',
        file_list=['file1','file2']
    )

查看提供的 link 以获得更多详细信息和示例。祝你好运!