使用 Python 接收和写入 Azure blob 存储的 Azure 组合器函数

Azure combiner function to receive and write the same to Azure blob storage using Python

我想使用 Python 创建一个 Azure 函数,它将从 Azure 事件中心读取数据。 幸运的是,Visual Studio 代码提供了一种创建 Azure 函数骨架的方法。可以根据需要进行编辑。 我可以在 Microsoft 文档的帮助下创建一个演示 HTTP 触发器 Azure 函数,但我不知道我应该在下面的函数中进行哪些更改,以便它可以从事件中心读取数据并将其写入 Azure斑点存储。 此外,如果有人可以参考建议任何博客以获取有关 azure 功能和标准做法的更多详细信息。

更新:

我尝试根据@Stanley 的建议更新我的代码,但可能需要更新代码。 我在我的 Azure 函数中编写了以下代码。

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "Storage account connection string",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "EventHub_ReceiverConnectionString": "Endpoint Connection String of the EventHubNamespace",
    "Blob_StorageConnectionString": "Storage account connection string"
  }
}

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "eventHubTrigger",
      "direction": "in",
      "name": "event",
      "eventHubName": "pwo-events",
      "connection": "EventHub_ReceiverConnectionString",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "binary"
    }
  ]
}

init.py

import logging
import azure.functions as func
from azure.storage.blob import BlobClient

storage_connection_string='Storage account connection string'
container_name = ''


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')
    blob_client = BlobClient.from_connection_string(storage_connection_string,container_name,str(event.sequence_number) + ".txt")
    blob_client.upload_blob(event.get_body().decode())

以下是我的 blob 容器的屏幕截图:

执行完上面的代码后,一些东西被写入了 blob 容器。 但它不是 txt 文件,而是以其他格式保存的。另外,如果我多次触发 azure 函数,那么文件就会被覆盖。 我想执行附加操作而不是覆盖。 另外,我想将我的文件保存在用户定义的位置。示例:container/Year=/month=/date= 谢谢!!

如果你想从Azure Event Hub读取数据,使用event hub trigger会容易很多,这是我的测试代码(读取数据和写入存储):

import logging
import azure.functions as func
from azure.storage.blob import BlobClient
import datetime

storage_connection_string=''
container_name = ''

today = datetime.datetime.today()


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')

    blob_client =  BlobClient.from_connection_string(
        storage_connection_string,container_name,
    str(today.year) +"/" + str(today.month) + "/" + str(today.day) + ".txt")

blob_client.upload_blob(event.get_body().decode(),blob_type="AppendBlob")

我使用下面的代码将事件发送到事件中心:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="<conn string>", eventhub_name="<hub name>")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

我的local.settings.json:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "<storage account conn str>",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "testhubname0123_test_EVENTHUB": "<event hub conn str>"
    }
}

我的function.jsonthis doc所示:

{
    "scriptFile": "__init__.py",
    "bindings": [{
        "type": "eventHubTrigger",
        "name": "event",
        "direction": "in",
        "eventHubName": "test01(this is my hubname, pls palce yours here)",
        "connection": "testhubname0123_test_EVENTHUB"
    }]
}

结果

运行 函数并使用上面的代码将数据发送到事件中心:

数据已成功存入存储:

下载.txt查看内容可以看到写了3个活动内容: