azure 函数 - 在 eventhub 中发生新事件时触发,将其写入 cosmos db - 不起作用,为什么?
azure function - triggered when new event in eventhub, writing it to cosmos db - not working, why?
希望你能帮帮我。
我是 Azure 的新手,在理解它时遇到了很多困难。
我正在尝试编写一个 azure 函数,它由 EventHubTrigger 触发(当一个新事件发送到 eventHub 时),并将该事件存储在我的 table 中的 cosmos 数据库中。 (cosmos db 作为输出)。
我正在用 C# 编写,因此 function.json 是自动创建的,我无法对其进行编辑。
我似乎无法正常工作,无法正确设置触发器和输出绑定。
这是我的功能代码:
[FunctionName("InsertEvent")]
public static void Run(
[EventHubTrigger("WaterlyNamespace",
Connection = "connectionStr")] string eventHubString,
[CosmosDB(
databaseName: "waterly_db",
collectionName: "water_table",
Id = "device_id",
ConnectionStringSetting = "conStr" )] out dynamic dbItem,
ILogger log)
{
log.LogInformation("C# trigger function processed an event from eventhub");
EventItem dataJson = JsonConvert.DeserializeObject<EventItem>(eventHubString);
//adding timestamp to event json
dataJson.timestamp = DateTime.Now;
dbItem = dataJson;
}
这是function.json生成的:
{
"generatedBy": "Microsoft.NET.Sdk.Functions-3.0.3",
"configurationSource": "attributes",
"bindings": [
{
"type": "eventHubTrigger",
"connection": "ConnectionStr",
"eventHubName": "WaterlyNamespace",
"name": "eventHubString"
}
],
"disabled": false,
"scriptFile": "../bin/Waterly-iot-functions.dll",
"entryPoint": "Waterly_iot_functions.InsertEvent.Run"
}
这是host.json:
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingExcludedTypes": "Request",
"samplingSettings": {
"isEnabled": true
}
}
},
"extensions": {
"cosmosDB": {
"connectionMode": "Gateway",
"protocol": "Https",
"leaseOptions": {
"leasePrefix": "prefix1"
}
}
}
}
这是发布此代码后我在 Azure 门户中看到的内容:
See Image
知道为什么触发器位于 Azure 门户的输出区域中,
我错过了什么?
如有任何帮助,我们将不胜感激。
谢谢,
我认为您对属性中连接字符串的使用有疑问。
按照我的步骤,它可以正常工作:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace FunctionApp54
{
public static class Function1
{
[FunctionName("Function1")]
public static void Run([EventHubTrigger("test", Connection = "str")] EventData[] events,
[CosmosDB(
databaseName: "testbowman",
collectionName: "testbowman",
ConnectionStringSetting = "CosmosDBConnection",
PartitionKey = "111")]out dynamic item,
ILogger log)
{
item = new { id = Guid.NewGuid() , custom = "11111111111111111111"};
}
}
}
这是我的local.settings.json:(在本地,环境变量设置在local.settings.json)
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"str": "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx",
"CosmosDBConnection": "AccountEndpoint=https://testbowman.documents.azure.com:443/;AccountKey=xxxxxx;"
}
}
您应该从这些地方获取连接字符串:
然后我创建了一个控制台应用程序来将事件发送到事件中心。
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace SendEventToEventHub
{
class Program
{
private const string connectionString = "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=pftXmTesAa894OWYGZyD5s8GynR9hXVJl7CdbMy45Nc=";
private const string eventHubName = "test";
static async Task Main(string[] args)
{
// Create a producer client that you can use to send events to an event hub
await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
{
// Create a batch of events
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
// Add events to the batch. An event is a represented by a collection of bytes and metadata.
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First event")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second event")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third event")));
// Use the producer client to send the batch of events to the event hub
await producerClient.SendAsync(eventBatch);
Console.WriteLine("A batch of 3 events has been published.");
}
}
}
}
之后,我启动我的函数,我可以在 cosmosdb 中看到输出:
顺便说一句,如果你部署到 azure,设置应该在下面设置而不是 local.settings.json:
如上尝试后是否能成功请告知。祝你好运。:)
希望你能帮帮我。
我是 Azure 的新手,在理解它时遇到了很多困难。 我正在尝试编写一个 azure 函数,它由 EventHubTrigger 触发(当一个新事件发送到 eventHub 时),并将该事件存储在我的 table 中的 cosmos 数据库中。 (cosmos db 作为输出)。
我正在用 C# 编写,因此 function.json 是自动创建的,我无法对其进行编辑。 我似乎无法正常工作,无法正确设置触发器和输出绑定。
这是我的功能代码:
[FunctionName("InsertEvent")]
public static void Run(
[EventHubTrigger("WaterlyNamespace",
Connection = "connectionStr")] string eventHubString,
[CosmosDB(
databaseName: "waterly_db",
collectionName: "water_table",
Id = "device_id",
ConnectionStringSetting = "conStr" )] out dynamic dbItem,
ILogger log)
{
log.LogInformation("C# trigger function processed an event from eventhub");
EventItem dataJson = JsonConvert.DeserializeObject<EventItem>(eventHubString);
//adding timestamp to event json
dataJson.timestamp = DateTime.Now;
dbItem = dataJson;
}
这是function.json生成的:
{
"generatedBy": "Microsoft.NET.Sdk.Functions-3.0.3",
"configurationSource": "attributes",
"bindings": [
{
"type": "eventHubTrigger",
"connection": "ConnectionStr",
"eventHubName": "WaterlyNamespace",
"name": "eventHubString"
}
],
"disabled": false,
"scriptFile": "../bin/Waterly-iot-functions.dll",
"entryPoint": "Waterly_iot_functions.InsertEvent.Run"
}
这是host.json:
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingExcludedTypes": "Request",
"samplingSettings": {
"isEnabled": true
}
}
},
"extensions": {
"cosmosDB": {
"connectionMode": "Gateway",
"protocol": "Https",
"leaseOptions": {
"leasePrefix": "prefix1"
}
}
}
}
这是发布此代码后我在 Azure 门户中看到的内容: See Image
知道为什么触发器位于 Azure 门户的输出区域中, 我错过了什么?
如有任何帮助,我们将不胜感激。 谢谢,
我认为您对属性中连接字符串的使用有疑问。
按照我的步骤,它可以正常工作:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace FunctionApp54
{
public static class Function1
{
[FunctionName("Function1")]
public static void Run([EventHubTrigger("test", Connection = "str")] EventData[] events,
[CosmosDB(
databaseName: "testbowman",
collectionName: "testbowman",
ConnectionStringSetting = "CosmosDBConnection",
PartitionKey = "111")]out dynamic item,
ILogger log)
{
item = new { id = Guid.NewGuid() , custom = "11111111111111111111"};
}
}
}
这是我的local.settings.json:(在本地,环境变量设置在local.settings.json)
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"str": "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx",
"CosmosDBConnection": "AccountEndpoint=https://testbowman.documents.azure.com:443/;AccountKey=xxxxxx;"
}
}
您应该从这些地方获取连接字符串:
然后我创建了一个控制台应用程序来将事件发送到事件中心。
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace SendEventToEventHub
{
class Program
{
private const string connectionString = "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=pftXmTesAa894OWYGZyD5s8GynR9hXVJl7CdbMy45Nc=";
private const string eventHubName = "test";
static async Task Main(string[] args)
{
// Create a producer client that you can use to send events to an event hub
await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
{
// Create a batch of events
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
// Add events to the batch. An event is a represented by a collection of bytes and metadata.
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First event")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second event")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third event")));
// Use the producer client to send the batch of events to the event hub
await producerClient.SendAsync(eventBatch);
Console.WriteLine("A batch of 3 events has been published.");
}
}
}
}
之后,我启动我的函数,我可以在 cosmosdb 中看到输出:
顺便说一句,如果你部署到 azure,设置应该在下面设置而不是 local.settings.json:
如上尝试后是否能成功请告知。祝你好运。:)