具有 blob 存储输入和输出的事件网格触发器 Azure Functions Python
Event Grid Trigger Azure Functions with blob storage input and output Python
我正在寻找可以帮助我开发和部署事件网格触发器的合适 resources/tutorials,该触发器将等待图像上传到 blob 容器,使用 python 处理该图像,然后将结果保存在另一个 blob 容器中。我发现了许多单独的文档,它们不一定在逻辑上引导我完成使用 Azure 门户和 VSCode 进行开发和部署,就像从头到尾的逐步演练一样,完成所有步骤这可能。
任何指导将不胜感激。
我不建议你直接使用Azure 门户进行开发。而既然需要使用基于Python开发Azure函数,那么就需要使用Azure函数核心工具
1、下载并安装Azure函数核心工具。(下载function3.x)
并且你需要安装python 3.8.x(请注意你必须下载64位,否则功能应用程序会在启动时出错。)
2、配置Python为系统路径
3、在VS code中安装函数扩展和Python调试扩展
4、按f1,输入'func',会找到创建函数选项。
5、按照创建函数的步骤进行操作。然后使用下面的代码:
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventGridTrigger",
"name": "event",
"direction": "in"
},
{
"name": "inputblob",
"type": "blob",
"path": "test1/6.txt",
"connection": "MyStorageConnectionAppSetting",
"direction": "in"
},
{
"name": "outputblob",
"type": "blob",
"path": "test2/6.txt",
"connection": "MyStorageConnectionAppSetting",
"direction": "out"
}
]
}
local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "",
"FUNCTIONS_WORKER_RUNTIME": "python",
"MyStorageConnectionAppSetting":"DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx==;EndpointSuffix=core.windows.net"
}
}
__init__py
import json
import logging
import azure.functions as func
def main(event: func.EventGridEvent, inputblob: func.InputStream, outputblob: func.Out[func.InputStream]):
result = json.dumps({
'id': event.id,
'data': event.get_json(),
'topic': event.topic,
'subject': event.subject,
'event_type': event.event_type,
})
logging.info('Python EventGrid trigger processed an event: %s', result)
#Do something here. you can change below inputblob to other stream.
outputblob.set(inputblob)
完成上述所有步骤后,创建事件网格订阅并将函数应用部署到 Azure。将事件网格指向您的函数应用。那么当A事件到来时,函数就会被触发。
需要注意的是Python中不支持Binder,所以...运行中无法指定路径。(很遗憾,Ibinder只支持.Net基于语言。)
以上是您的要求,它可以工作,但 blob 路径不能是动态的...我认为对于您的情况,更合适的方法是使用 blobtrigger 和 blob 输出绑定。(这可以从中获取 blob 名称触发输出。)
我正在寻找可以帮助我开发和部署事件网格触发器的合适 resources/tutorials,该触发器将等待图像上传到 blob 容器,使用 python 处理该图像,然后将结果保存在另一个 blob 容器中。我发现了许多单独的文档,它们不一定在逻辑上引导我完成使用 Azure 门户和 VSCode 进行开发和部署,就像从头到尾的逐步演练一样,完成所有步骤这可能。
任何指导将不胜感激。
我不建议你直接使用Azure 门户进行开发。而既然需要使用基于Python开发Azure函数,那么就需要使用Azure函数核心工具
1、下载并安装Azure函数核心工具。(下载function3.x)
并且你需要安装python 3.8.x(请注意你必须下载64位,否则功能应用程序会在启动时出错。)
2、配置Python为系统路径
3、在VS code中安装函数扩展和Python调试扩展
4、按f1,输入'func',会找到创建函数选项。
5、按照创建函数的步骤进行操作。然后使用下面的代码:
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventGridTrigger",
"name": "event",
"direction": "in"
},
{
"name": "inputblob",
"type": "blob",
"path": "test1/6.txt",
"connection": "MyStorageConnectionAppSetting",
"direction": "in"
},
{
"name": "outputblob",
"type": "blob",
"path": "test2/6.txt",
"connection": "MyStorageConnectionAppSetting",
"direction": "out"
}
]
}
local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "",
"FUNCTIONS_WORKER_RUNTIME": "python",
"MyStorageConnectionAppSetting":"DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx==;EndpointSuffix=core.windows.net"
}
}
__init__py
import json
import logging
import azure.functions as func
def main(event: func.EventGridEvent, inputblob: func.InputStream, outputblob: func.Out[func.InputStream]):
result = json.dumps({
'id': event.id,
'data': event.get_json(),
'topic': event.topic,
'subject': event.subject,
'event_type': event.event_type,
})
logging.info('Python EventGrid trigger processed an event: %s', result)
#Do something here. you can change below inputblob to other stream.
outputblob.set(inputblob)
完成上述所有步骤后,创建事件网格订阅并将函数应用部署到 Azure。将事件网格指向您的函数应用。那么当A事件到来时,函数就会被触发。
需要注意的是Python中不支持Binder,所以...运行中无法指定路径。(很遗憾,Ibinder只支持.Net基于语言。)
以上是您的要求,它可以工作,但 blob 路径不能是动态的...我认为对于您的情况,更合适的方法是使用 blobtrigger 和 blob 输出绑定。(这可以从中获取 blob 名称触发输出。)