Azure 数据资源管理器:如何使用 Kusto 和更新策略从原始 JSON 事件更新 table?

Azure Data Explorer: how to update a table from a raw JSON event using Kusto and update policies?

我将原始遥测数据作为 JSON 记录提取到名为 RawEvents 的单列 table 中,位于名为 Event 的列中。这是 record/event 的样子:

{
  "clientId": "myclient1",
  "IotHubDeviceId": "myiothubdevice1",
  "deviceId": "mydevice1",
  "timestamp": "2022-04-12T10:29:00.123",
  "telemetry": [
    {
        "telemetryId: "total.power"
        "value": 123.456
    },
    {
        "telemetryId: "temperature"
        "value": 34.56
    },
    ...
  ]
}

RawEvents table 是这样创建和设置的:

.create table RawEvents (Event: dynamic)
.create table RawEvents ingestion json mapping 'MyRawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'

还有Telemetry table 会用来查询和分析。 Telemetry table 具有与 RawEvents table 中的原始数据记录结构相匹配的强类型列。它是这样创建的:

.create table Telemetry (ClientId:string, IotHubDeviceId:string, DeviceId:string, Timestamp:datetime, TelemetryId:string, Value: real)

为了让 Telemetry table 在新的原始事件被摄取到 RawEvents 时更新记录,我尝试定义一个数据转换函数并使用该函数在将附加到 Telemetry table.

的更新策略中

为此,我使用了以下脚本来验证我的数据转换逻辑是否按预期工作:

datatable (event:dynamic)
[
    dynamic(
        {
            "clientId": "myclient1",
            "IotHubDeviceId": "myiothubdevice1",
            "deviceId": "mydevice1", 
            "timestamp": "2022-04-12T10:29:00.123",
            "telemetry": [
                {
                    "telemetryId": "total.power",
                    "value": 123.456
                },
                {
                    "telemetryId": "temperature",
                    "value": 34.56
                }
            ]
        }
    )
]
| evaluate bag_unpack(event)
| mv-expand telemetry
| evaluate bag_unpack(telemetry)

执行该脚本会得到与 Telemetry table 结构匹配的所需输出:

clientId  deviceId  IotHubDeviceId  timestamp  telemetryId  value
myclient1  mydevice1  myiothubdevice1  2022-04-12T10:29:00.123Z  total.power  123.456
myclient1  mydevice1  myiothubdevice1  2022-04-12T10:29:00.123Z  temperature  34.56

接下来,我创建了一个名为 ExpandTelemetryEvent 的函数,其中包含应用于 RawEvents.Event:

的相同数据转换逻辑
.create function ExpandTelemetryEvent() {
    RawEvents
        | evaluate bag_unpack(Event)
        | mv-expand telemetry
        | evaluate bag_unpack(telemetry)
}

作为最后一步,我尝试为 Telemetry table 创建一个更新策略,它将使用 RawEvents 作为源并使用 ExpandTelemetryEvent() 作为变换函数:

.alter table Telemetry policy update @'[{"Source": "RawEvents", "Query": "ExpandTelemetryEvent()", "IsEnabled": "True"}]'

这是我收到错误消息的地方

Error during execution of a policy operation: Caught exception while validating query for Update Policy: 'IsEnabled = 'True', Source = 'RawEvents', Query = 'ExpandTelemetryEvent()', IsTransactional = 'False', PropagateIngestionProperties = 'False''. Exception: Request is invalid and cannot be processed: Semantic error: SEM0100: 'mvexpand' operator: Failed to resolve scalar expression named 'telemetry'

我有点理解为什么不能应用该政策。使用示例脚本,数据转换有效,因为有足够的信息可以推断 telemetry 是什么,而在这种情况下,RawEvents.Event 中没有任何内容可以提供有关原始数据结构的信息将存储在 Event 列中的事件。

如何解决?这是正确的方法吗?

bag_unpack plugin 文档所示:

The plugin's output schema depends on the data values, making it as "unpredictable" as the data itself. Multiple executions of the plugin, using different data inputs, may produce different output schema.

改用well-defined转换

RawEvent
| project clientId = event.clientId, deviceId = event.deviceId, IotHubDeviceId = event.IotHubDeviceId, timestamp = event.timestamp, event.telemetry
| mv-expand event_telemetry
| extend telemetryId = event_telemetry.telemetryId, value = event_telemetry.value
| project-away event_telemetry