如何在 Airflow DAG 中使用 WasbHook 从 Azure 获取 blob 列表

How to get blob list from Azure using WasbHook in Airflow DAG

我有一个用于 Azure Blob 存储的简单 DAG。

import airflow
from airflow import DAG
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.operators.python_operator import PythonOperator

azure = WasbHook(wasb_conn_id='connect_to_azure')

args = {
    "owner": "Airflow",
    "start_date": airflow.utils.dates.days_ago(2)}

dag = DAG(
    dag_id="wasb_sensor_test",
    default_args=args,
    schedule_interval=None,
    tags=['poc', 'azure'])  
    
def get_blob_list():
    blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX')
    
print_blob_list = PythonOperator(
    task_id='get_blob_list',
    python_callable=get_blob_list,
    dag=dag)
    

print_blob_list

我想获得一个列表,其中包含有关适当容器和前缀的 blob。正如我从钩子的代码源(https://github.com/apache/airflow/blob/6d612efc7e19fff01b0da98bc345320edde70237/airflow/providers/microsoft/azure/hooks/wasb.py#L73)中了解到的那样,如果调用 check_for_prefix 函数并添加和附加参数代替 ** 是可能的kwargs。 像

blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX', blob_list_return)

但我不知道如何正确操作。

是我的错。 check_for_prefix 函数中 **kwargs 的所有参数都在调用 BlockBlobService.list_blobs[=17 时传递=](..., **kwargs) 函数里面。

对于带有更新的 Azure Blob 存储库 v12 的 WasbHook get_blobs_list 仍然可以像 BlockBlobService 一样工作。唯一的区别是它 returns 一个 blob 名称列表。

azure.get_blobs_list(container_name,prefix)