如何在 Airflow DAG 中使用 WasbHook 从 Azure 获取 blob 列表
How to get blob list from Azure using WasbHook in Airflow DAG
我有一个用于 Azure Blob 存储的简单 DAG。
import airflow
from airflow import DAG
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.operators.python_operator import PythonOperator
azure = WasbHook(wasb_conn_id='connect_to_azure')
args = {
"owner": "Airflow",
"start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(
dag_id="wasb_sensor_test",
default_args=args,
schedule_interval=None,
tags=['poc', 'azure'])
def get_blob_list():
blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX')
print_blob_list = PythonOperator(
task_id='get_blob_list',
python_callable=get_blob_list,
dag=dag)
print_blob_list
我想获得一个列表,其中包含有关适当容器和前缀的 blob。正如我从钩子的代码源(https://github.com/apache/airflow/blob/6d612efc7e19fff01b0da98bc345320edde70237/airflow/providers/microsoft/azure/hooks/wasb.py#L73)中了解到的那样,如果调用 check_for_prefix 函数并添加和附加参数代替 ** 是可能的kwargs。
像
blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX', blob_list_return)
但我不知道如何正确操作。
是我的错。 check_for_prefix 函数中 **kwargs 的所有参数都在调用 BlockBlobService.list_blobs[=17 时传递=](..., **kwargs) 函数里面。
对于带有更新的 Azure Blob 存储库 v12 的 WasbHook get_blobs_list 仍然可以像 BlockBlobService 一样工作。唯一的区别是它 returns 一个 blob 名称列表。
azure.get_blobs_list(container_name,prefix)
我有一个用于 Azure Blob 存储的简单 DAG。
import airflow
from airflow import DAG
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.operators.python_operator import PythonOperator
azure = WasbHook(wasb_conn_id='connect_to_azure')
args = {
"owner": "Airflow",
"start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(
dag_id="wasb_sensor_test",
default_args=args,
schedule_interval=None,
tags=['poc', 'azure'])
def get_blob_list():
blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX')
print_blob_list = PythonOperator(
task_id='get_blob_list',
python_callable=get_blob_list,
dag=dag)
print_blob_list
我想获得一个列表,其中包含有关适当容器和前缀的 blob。正如我从钩子的代码源(https://github.com/apache/airflow/blob/6d612efc7e19fff01b0da98bc345320edde70237/airflow/providers/microsoft/azure/hooks/wasb.py#L73)中了解到的那样,如果调用 check_for_prefix 函数并添加和附加参数代替 ** 是可能的kwargs。 像
blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX', blob_list_return)
但我不知道如何正确操作。
是我的错。 check_for_prefix 函数中 **kwargs 的所有参数都在调用 BlockBlobService.list_blobs[=17 时传递=](..., **kwargs) 函数里面。
对于带有更新的 Azure Blob 存储库 v12 的 WasbHook get_blobs_list 仍然可以像 BlockBlobService 一样工作。唯一的区别是它 returns 一个 blob 名称列表。
azure.get_blobs_list(container_name,prefix)