使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?

What is the best way to check if a file exists on an Azure Datalake using Apache Airflow?

我有一个 DAG,用于检查文件是否已上传到特定目录中的 Azure DataLake。如果是这样,它允许其他 DAGs 运行.

我考虑过使用 FileSensor,但我认为 fsconnid 参数不足以针对 DataLake 进行身份验证

直接Azure provider but you can easily implement one since the AzureDataLakeHook has check_for_file function so all needed is to wrap this function with Sensor class implementing poke() function of BaseSensorOperator. By doing so you can use Microsoft Azure Data Lake Connection中没有AzureDataLakeSensor

我没有测试过,但这应该有效:

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator

class MyAzureDataLakeSensor(BaseSensorOperator):
    """
    Sense for files in Azure Data Lake

    :param path: The Azure Data Lake path to find the objects. Supports glob
        strings (templated)
    :param azure_data_lake_conn_id: The Azure Data Lake conn
    """

    template_fields: Sequence[str] = ('path',)
    ui_color = '#901dd2'

    def __init__(
        self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.path = path
        self.azure_data_lake_conn_id = azure_data_lake_conn_id

    def poke(self, context: "Context") -> bool:
        hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
        self.log.info('Poking for file in path: %s', self.path)
        try:
            hook.check_for_file(file_path=self.path)
            return True
        except FileNotFoundError:
            pass
        return False

用法示例:

MyAzureDataLakeSensor(
    task_id='adls_sense',
    path='folder/file.csv',
    azure_data_lake_conn_id='azure_data_lake_default',
    mode='reschedule'
)

首先,看看official Microsoft Operators for Airflow

我们可以看到 Azure DataLake Storage 有专门的 Operator,不幸的是,目前似乎只有 ADLSDeleteOperator 可用。

ADLSDeleteOperator 使用 AzureDataLakeHook,您应该在自己的自定义运算符中重复使用它来检查文件是否存在。

我对您的建议是使用 ADLS 挂钩创建 CheckOperator 的子 class,使用挂钩的 check_for_file 功能检查输入中提供的文件是否存在。

更新:正如评论中所指出的,CheckOperator 似乎与 SQL 查询相关联并且已被弃用。使用您自己的自定义传感器或自定义运算符是可行的方法。

我在使用提议的 API 时遇到了严重的问题。所以我将 Microsoft API 嵌入到 Airflow 中。这工作正常。然后您需要做的就是使用此运算符并传递 account_url 和 access_token.

from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator

class AzureDataLakeSensor(BaseSensorOperator):

   def __init__(self, path, filename, account_url, access_token, **kwargs):
      super().__init__(**kwargs)
      self._client = DataLakeServiceClient(
            account_url=account_url,
            credential=access_token
      )

      self.path = path
      self.filename = filename

  def poke(self, context):
      container = self._client.get_file_system_client(file_system="raw")
      dir_client = container.get_directory_client(self.path)
      file = dir_client.get_file_client(self.filename)
      return file.exists()