将分析流式传输到事件中心 - 意外串联事件

Stream Analytics to Event Hub - Unexpectedly concatenating events

我有一个流分析作业,它正在使用 avro 消息的事件中心(我们称之为 RawEvents),transforming/flattening 消息并将它们发送到一个单独的事件中心(我们称之为格式化事件)。

RawEvents 中的每个 EventData 实例都包含一个顶级 json 对象,该对象具有一组更详细的事件。这是一个人为的例子:

[{ "Events": [{ "dataOne": 123.0, "dataTwo": 234.0, "subEventCode": 3, "dateTimeLocal": 1482170771, "dateTimeUTC": 1482192371 }, { "dataOne": 456.0, "dataTwo": 789.0, "subEventCode": 20, "dateTimeLocal": 1482170771, "dateTimeUTC": 1482192371 }], "messageType": "myDeviceType-Events", "deviceID": "myDevice", }]

流分析作业将结果展平并解包 subEventCode,这是一个位掩码。结果看起来像这样:

{"messagetype":"myDeviceType-Event","deviceid":"myDevice",eventid:1,"dataone":123,"datatwo":234,"subeventcode":6,"flag1":0,"flag2":1,"flag3":1,"flag4":0,"flag5":0,"flag6":0,"flag7":0,"flag8":0,"flag9":0,"flag10":0,"flag11":0,"flag12":0,"flag13":0,"flag14":0,"flag15":0,"flag16":0,"eventepochlocal":"2016-12-06T17:33:11.0000000Z","eventepochutc":"2016-12-06T23:33:11.0000000Z"} {"messagetype":"myDeviceType-Event","deviceid":"myDevice",eventid:2,"dataone":456,"datatwo":789,"subeventcode":8,"flag1":0,"flag2":0,"flag3":0,"flag4":1,"flag5":0,"flag6":0,"flag7":0,"flag8":0,"flag9":0,"flag10":0,"flag11":0,"flag12":0,"flag13":0,"flag14":0,"flag15":0,"flag16":0,"eventepochlocal":"2016-12-06T17:33:11.0000000Z","eventepochutc":"2016-12-06T23:33:11.0000000Z"}

当我从 FormattedEvents 事件中心提取消息时,我希望看到两个 EventData 实例。我得到的是同一条消息中包含两个 "flattened" 事件的单个 EventData。当以 blob 存储或数据湖为目标时,这是预期的行为,但在以事件中心为目标时却令人惊讶。我的期望是类似于服务总线的行为。

这是预期的行为吗?如果是这样,是否有强制行为的配置选项?

是的,这是当前的预期行为。目的是提高吞吐量,尝试在 EventHub 消息 (EventData) 中发送尽可能多的事件。

不幸的是,目前还没有配置选项可以覆盖此行为。一种可能值得尝试的方法是将输出分区键的概念用于超级独特的东西(即将此列添加到您的查询中—— GetMetadataPropertyValue(ehInput, "EventId") as outputpk )。现在在输出 EventHub 的 ASA 设置中将 "outputpk" 指定为 PartitionKey。

如果有帮助请告诉我。

干杯 车坛

我遇到了同样的问题。感谢您手动格式化输入消息的答案。我和我的同事用几行代码解决了它,删除了换行和回车return。然后我将“}{”替换为“},{”,并通过在两端添加“[”和“]”使其成为一个数组。

string modifiedMessage = myEventHubMessage.Replace("\n","").Replace("\r","");    
modifiedMessage = "[" + modifiedMessage.Replace("}{","},{") + "]";

然后根据其数据结构将输入作为对象列表:

List<TelemetryDataPoint> newDataPoints = new List<TelemetryDataPoint>();
try
{
    newDataPoints = Newtonsoft.Json.JsonConvert.DeserializeObject<List<TelemetryDataPoint>>(modifiedMessage);

..... ....