具有 blob 存储输入和输出的事件网格触发器 Azure Functions Python

Event Grid Trigger Azure Functions with blob storage input and output Python

我正在寻找可以帮助我开发和部署事件网格触发器的合适 resources/tutorials,该触发器将等待图像上传到 blob 容器,使用 python 处理该图像,然后将结果保存在另一个 blob 容器中。我发现了许多单独的文档,它们不一定在逻辑上引导我完成使用 Azure 门户和 VSCode 进行开发和部署,就像从头到尾的逐步演练一样,完成所有步骤这可能。

任何指导将不胜感激。

我不建议你直接使用Azure 门户进行开发。而既然需要使用基于Python开发Azure函数,那么就需要使用Azure函数核心工具

1、下载并安装Azure函数核心工具。(下载function3.x)

https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=windows%2Ccsharp%2Cbash#v2

并且你需要安装python 3.8.x(请注意你必须下载64位,否则功能应用程序会在启动时出错。)

2、配置Python为系统路径

3、在VS code中安装函数扩展和Python调试扩展

4、按f1,输入'func',会找到创建函数选项。

5、按照创建函数的步骤进行操作。然后使用下面的代码:

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventGridTrigger",
      "name": "event",
      "direction": "in"
    },
    {
      "name": "inputblob",
      "type": "blob",
      "path": "test1/6.txt",
      "connection": "MyStorageConnectionAppSetting",
      "direction": "in"
    },
    {
      "name": "outputblob",
      "type": "blob",
      "path": "test2/6.txt",
      "connection": "MyStorageConnectionAppSetting",
      "direction": "out"
    }
  ]
}

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "MyStorageConnectionAppSetting":"DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx==;EndpointSuffix=core.windows.net"
  }
}

__init__py

import json
import logging

import azure.functions as func


def main(event: func.EventGridEvent, inputblob: func.InputStream, outputblob: func.Out[func.InputStream]):
    result = json.dumps({
        'id': event.id,
        'data': event.get_json(),
        'topic': event.topic,
        'subject': event.subject,
        'event_type': event.event_type,
    })

    logging.info('Python EventGrid trigger processed an event: %s', result)

    #Do something here. you can change below inputblob to other stream.

    outputblob.set(inputblob)

完成上述所有步骤后,创建事件网格订阅并将函数应用部署到 Azure。将事件网格指向您的函数应用。那么当A事件到来时,函数就会被触发。

需要注意的是Python中不支持Binder,所以...运行中无法指定路径。(很遗憾,Ibinder只支持.Net基于语言。)

以上是您的要求,它可以工作,但 blob 路径不能是动态的...我认为对于您的情况,更合适的方法是使用 blobtrigger 和 blob 输出绑定。(这可以从中获取 blob 名称触发输出。)