带有事件中心输出绑定的 Azure 函数不起作用
Azure Function with Event Hub out binding does not work
我正在使用 Azure Function 将消息发送到多个事件中心输出(从输入 EH)。我的代码如下:
[FunctionName("Gateway")]
public static void Run([EventHubTrigger("%INPUT_HUB_NAME%", Connection = "iothubconnection", ConsumerGroup = "functiontest")]EventData[] eventHubMessage,
[EventHub("%OUT_HUB_NAME%", Connection = "eventhuboutput")]out EventData outputEventHubMessageHotPath,
[EventHub("%OUT_HUB_NAME%", Connection = "eventhuboutput2")]out EventData outputEventHubMessageColdPath,
TraceWriter log)
{
log.Info("**-- Start Azure Func -- **");
foreach (var ehMsg in eventHubMessage)
{
//section to build up the raw section
var rawMessageSection = GetPayload(ehMsg.GetBytes());
var deviceId = GetDeviceId(ehMsg);
log.Info($"Extracted deviceId: {deviceId}");
if (rawMessageSection.aggregates != null)
{
var message = CreateEHMessages("aggregates", rawMessageSection, deviceId, log);
outputEventHubMessageHotPath = message;
outputEventHubMessageColdPath = message;
}
if (rawMessageSection.events != null)
{
outputEventHubMessageColdPath = CreateEHMessages("events", rawMessageSection, deviceId, log);
}
if (rawMessageSection.ipis != null)
{
outputEventHubMessageColdPath = CreateEHMessages("ipis", rawMessageSection, deviceId, log);
}
if (rawMessageSection.errors != null)
{
outputEventHubMessageColdPath = CreateEHMessages("errors", rawMessageSection, deviceId, log);
}
if (rawMessageSection.batteries != null)
{
outputEventHubMessageColdPath = CreateEHMessages("batteries", rawMessageSection, deviceId, log);
}
//await Task.WhenAll(tasks);
}
outputEventHubMessageHotPath = outputEventHubMessageColdPath = null;
}
其中:
public static EventData CreateEHMessages(string messageType, dynamic messageBatch, string deviceId, TraceWriter log)
{
var timezone = messageBatch.timezone;
var deviceInstanceId = messageBatch.deviceInstanceId;
int i = 0;
List<dynamic> result = new List<dynamic>();
foreach (var msg in messageBatch[messageType])
{
msg.deviceId = deviceId;
msg.timezone = timezone;
msg.deviceInstanceId = deviceInstanceId;
msg.type = messageType;
result.Add(msg);
i++;
}
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(result)));
eventData.PartitionKey = deviceInstanceId;
return eventData;
}
问题是这个函数似乎没有发布到 EventHub。我尝试使用不同的绑定语法,但我无法让它工作。
我怀疑与输出绑定有关,但我再次尝试了很多选项。
有什么想法吗?
您多次设置 out
参数,因此除最后一次分配外的所有分配都将丢失。但是你的最后一个任务是将它们设置为 null
,这实际上意味着你 return 没有来自你的函数的消息。
改为查看 ICollector
。
将您的输出参数定义为收集器:
[EventHub("%OUT_HUB_NAME%", Connection = "eventhuboutput")]
ICollector<EventData> outputEventHubMessageHotPath,
然后将每条消息添加到收集器,例如:
if (rawMessageSection.events != null)
{
outputEventHubMessageHotPath.Add(
CreateEHMessages("events", rawMessageSection, deviceId, log));
}
我正在使用 Azure Function 将消息发送到多个事件中心输出(从输入 EH)。我的代码如下:
[FunctionName("Gateway")]
public static void Run([EventHubTrigger("%INPUT_HUB_NAME%", Connection = "iothubconnection", ConsumerGroup = "functiontest")]EventData[] eventHubMessage,
[EventHub("%OUT_HUB_NAME%", Connection = "eventhuboutput")]out EventData outputEventHubMessageHotPath,
[EventHub("%OUT_HUB_NAME%", Connection = "eventhuboutput2")]out EventData outputEventHubMessageColdPath,
TraceWriter log)
{
log.Info("**-- Start Azure Func -- **");
foreach (var ehMsg in eventHubMessage)
{
//section to build up the raw section
var rawMessageSection = GetPayload(ehMsg.GetBytes());
var deviceId = GetDeviceId(ehMsg);
log.Info($"Extracted deviceId: {deviceId}");
if (rawMessageSection.aggregates != null)
{
var message = CreateEHMessages("aggregates", rawMessageSection, deviceId, log);
outputEventHubMessageHotPath = message;
outputEventHubMessageColdPath = message;
}
if (rawMessageSection.events != null)
{
outputEventHubMessageColdPath = CreateEHMessages("events", rawMessageSection, deviceId, log);
}
if (rawMessageSection.ipis != null)
{
outputEventHubMessageColdPath = CreateEHMessages("ipis", rawMessageSection, deviceId, log);
}
if (rawMessageSection.errors != null)
{
outputEventHubMessageColdPath = CreateEHMessages("errors", rawMessageSection, deviceId, log);
}
if (rawMessageSection.batteries != null)
{
outputEventHubMessageColdPath = CreateEHMessages("batteries", rawMessageSection, deviceId, log);
}
//await Task.WhenAll(tasks);
}
outputEventHubMessageHotPath = outputEventHubMessageColdPath = null;
}
其中:
public static EventData CreateEHMessages(string messageType, dynamic messageBatch, string deviceId, TraceWriter log)
{
var timezone = messageBatch.timezone;
var deviceInstanceId = messageBatch.deviceInstanceId;
int i = 0;
List<dynamic> result = new List<dynamic>();
foreach (var msg in messageBatch[messageType])
{
msg.deviceId = deviceId;
msg.timezone = timezone;
msg.deviceInstanceId = deviceInstanceId;
msg.type = messageType;
result.Add(msg);
i++;
}
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(result)));
eventData.PartitionKey = deviceInstanceId;
return eventData;
}
问题是这个函数似乎没有发布到 EventHub。我尝试使用不同的绑定语法,但我无法让它工作。 我怀疑与输出绑定有关,但我再次尝试了很多选项。 有什么想法吗?
您多次设置 out
参数,因此除最后一次分配外的所有分配都将丢失。但是你的最后一个任务是将它们设置为 null
,这实际上意味着你 return 没有来自你的函数的消息。
改为查看 ICollector
。
将您的输出参数定义为收集器:
[EventHub("%OUT_HUB_NAME%", Connection = "eventhuboutput")]
ICollector<EventData> outputEventHubMessageHotPath,
然后将每条消息添加到收集器,例如:
if (rawMessageSection.events != null)
{
outputEventHubMessageHotPath.Add(
CreateEHMessages("events", rawMessageSection, deviceId, log));
}