使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?
What is the best way to check if a file exists on an Azure Datalake using Apache Airflow?
我有一个 DAG,用于检查文件是否已上传到特定目录中的 Azure DataLake。如果是这样,它允许其他 DAGs 运行.
我考虑过使用 FileSensor,但我认为 fsconnid 参数不足以针对 DataLake 进行身份验证
直接Azure provider but you can easily implement one since the AzureDataLakeHook
has check_for_file
function so all needed is to wrap this function with Sensor class implementing poke()
function of BaseSensorOperator
. By doing so you can use Microsoft Azure Data Lake Connection中没有AzureDataLakeSensor
。
我没有测试过,但这应该有效:
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator
class MyAzureDataLakeSensor(BaseSensorOperator):
"""
Sense for files in Azure Data Lake
:param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
:param azure_data_lake_conn_id: The Azure Data Lake conn
"""
template_fields: Sequence[str] = ('path',)
ui_color = '#901dd2'
def __init__(
self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
) -> None:
super().__init__(**kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
def poke(self, context: "Context") -> bool:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Poking for file in path: %s', self.path)
try:
hook.check_for_file(file_path=self.path)
return True
except FileNotFoundError:
pass
return False
用法示例:
MyAzureDataLakeSensor(
task_id='adls_sense',
path='folder/file.csv',
azure_data_lake_conn_id='azure_data_lake_default',
mode='reschedule'
)
首先,看看official Microsoft Operators for Airflow。
我们可以看到 Azure DataLake Storage 有专门的 Operator,不幸的是,目前似乎只有 ADLSDeleteOperator
可用。
此 ADLSDeleteOperator
使用 AzureDataLakeHook,您应该在自己的自定义运算符中重复使用它来检查文件是否存在。
我对您的建议是使用 ADLS 挂钩创建 CheckOperator 的子 class,使用挂钩的 check_for_file
功能检查输入中提供的文件是否存在。
更新:正如评论中所指出的,CheckOperator 似乎与 SQL 查询相关联并且已被弃用。使用您自己的自定义传感器或自定义运算符是可行的方法。
我在使用提议的 API 时遇到了严重的问题。所以我将 Microsoft API 嵌入到 Airflow 中。这工作正常。然后您需要做的就是使用此运算符并传递 account_url 和 access_token.
from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator
class AzureDataLakeSensor(BaseSensorOperator):
def __init__(self, path, filename, account_url, access_token, **kwargs):
super().__init__(**kwargs)
self._client = DataLakeServiceClient(
account_url=account_url,
credential=access_token
)
self.path = path
self.filename = filename
def poke(self, context):
container = self._client.get_file_system_client(file_system="raw")
dir_client = container.get_directory_client(self.path)
file = dir_client.get_file_client(self.filename)
return file.exists()
我有一个 DAG,用于检查文件是否已上传到特定目录中的 Azure DataLake。如果是这样,它允许其他 DAGs 运行.
我考虑过使用 FileSensor,但我认为 fsconnid 参数不足以针对 DataLake 进行身份验证
直接Azure provider but you can easily implement one since the AzureDataLakeHook
has check_for_file
function so all needed is to wrap this function with Sensor class implementing poke()
function of BaseSensorOperator
. By doing so you can use Microsoft Azure Data Lake Connection中没有AzureDataLakeSensor
。
我没有测试过,但这应该有效:
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator
class MyAzureDataLakeSensor(BaseSensorOperator):
"""
Sense for files in Azure Data Lake
:param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
:param azure_data_lake_conn_id: The Azure Data Lake conn
"""
template_fields: Sequence[str] = ('path',)
ui_color = '#901dd2'
def __init__(
self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
) -> None:
super().__init__(**kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
def poke(self, context: "Context") -> bool:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Poking for file in path: %s', self.path)
try:
hook.check_for_file(file_path=self.path)
return True
except FileNotFoundError:
pass
return False
用法示例:
MyAzureDataLakeSensor(
task_id='adls_sense',
path='folder/file.csv',
azure_data_lake_conn_id='azure_data_lake_default',
mode='reschedule'
)
首先,看看official Microsoft Operators for Airflow。
我们可以看到 Azure DataLake Storage 有专门的 Operator,不幸的是,目前似乎只有 ADLSDeleteOperator
可用。
此 ADLSDeleteOperator
使用 AzureDataLakeHook,您应该在自己的自定义运算符中重复使用它来检查文件是否存在。
我对您的建议是使用 ADLS 挂钩创建 CheckOperator 的子 class,使用挂钩的 check_for_file
功能检查输入中提供的文件是否存在。
更新:正如评论中所指出的,CheckOperator 似乎与 SQL 查询相关联并且已被弃用。使用您自己的自定义传感器或自定义运算符是可行的方法。
我在使用提议的 API 时遇到了严重的问题。所以我将 Microsoft API 嵌入到 Airflow 中。这工作正常。然后您需要做的就是使用此运算符并传递 account_url 和 access_token.
from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator
class AzureDataLakeSensor(BaseSensorOperator):
def __init__(self, path, filename, account_url, access_token, **kwargs):
super().__init__(**kwargs)
self._client = DataLakeServiceClient(
account_url=account_url,
credential=access_token
)
self.path = path
self.filename = filename
def poke(self, context):
container = self._client.get_file_system_client(file_system="raw")
dir_client = container.get_directory_client(self.path)
file = dir_client.get_file_client(self.filename)
return file.exists()