Azure 数据资源管理器:如何使用 Kusto 和更新策略从原始 JSON 事件更新 table?
Azure Data Explorer: how to update a table from a raw JSON event using Kusto and update policies?
我将原始遥测数据作为 JSON 记录提取到名为 RawEvents
的单列 table 中,位于名为 Event
的列中。这是 record/event 的样子:
{
"clientId": "myclient1",
"IotHubDeviceId": "myiothubdevice1",
"deviceId": "mydevice1",
"timestamp": "2022-04-12T10:29:00.123",
"telemetry": [
{
"telemetryId: "total.power"
"value": 123.456
},
{
"telemetryId: "temperature"
"value": 34.56
},
...
]
}
RawEvents
table 是这样创建和设置的:
.create table RawEvents (Event: dynamic)
.create table RawEvents ingestion json mapping 'MyRawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
还有Telemetry
table 会用来查询和分析。 Telemetry
table 具有与 RawEvents
table 中的原始数据记录结构相匹配的强类型列。它是这样创建的:
.create table Telemetry (ClientId:string, IotHubDeviceId:string, DeviceId:string, Timestamp:datetime, TelemetryId:string, Value: real)
为了让 Telemetry
table 在新的原始事件被摄取到 RawEvents
时更新记录,我尝试定义一个数据转换函数并使用该函数在将附加到 Telemetry
table.
的更新策略中
为此,我使用了以下脚本来验证我的数据转换逻辑是否按预期工作:
datatable (event:dynamic)
[
dynamic(
{
"clientId": "myclient1",
"IotHubDeviceId": "myiothubdevice1",
"deviceId": "mydevice1",
"timestamp": "2022-04-12T10:29:00.123",
"telemetry": [
{
"telemetryId": "total.power",
"value": 123.456
},
{
"telemetryId": "temperature",
"value": 34.56
}
]
}
)
]
| evaluate bag_unpack(event)
| mv-expand telemetry
| evaluate bag_unpack(telemetry)
执行该脚本会得到与 Telemetry
table 结构匹配的所需输出:
clientId deviceId IotHubDeviceId timestamp telemetryId value
myclient1 mydevice1 myiothubdevice1 2022-04-12T10:29:00.123Z total.power 123.456
myclient1 mydevice1 myiothubdevice1 2022-04-12T10:29:00.123Z temperature 34.56
接下来,我创建了一个名为 ExpandTelemetryEvent
的函数,其中包含应用于 RawEvents.Event
:
的相同数据转换逻辑
.create function ExpandTelemetryEvent() {
RawEvents
| evaluate bag_unpack(Event)
| mv-expand telemetry
| evaluate bag_unpack(telemetry)
}
作为最后一步,我尝试为 Telemetry
table 创建一个更新策略,它将使用 RawEvents
作为源并使用 ExpandTelemetryEvent()
作为变换函数:
.alter table Telemetry policy update @'[{"Source": "RawEvents", "Query": "ExpandTelemetryEvent()", "IsEnabled": "True"}]'
这是我收到错误消息的地方
Error during execution of a policy operation: Caught exception while validating query for Update Policy: 'IsEnabled = 'True', Source = 'RawEvents', Query = 'ExpandTelemetryEvent()', IsTransactional = 'False', PropagateIngestionProperties = 'False''. Exception: Request is invalid and cannot be processed: Semantic error: SEM0100: 'mvexpand' operator: Failed to resolve scalar expression named 'telemetry'
我有点理解为什么不能应用该政策。使用示例脚本,数据转换有效,因为有足够的信息可以推断 telemetry
是什么,而在这种情况下,RawEvents.Event
中没有任何内容可以提供有关原始数据结构的信息将存储在 Event
列中的事件。
如何解决?这是正确的方法吗?
如 bag_unpack plugin 文档所示:
The plugin's output schema depends on the data values, making it as "unpredictable" as the data itself. Multiple executions of the plugin, using different data inputs, may produce different output schema.
改用well-defined转换
RawEvent
| project clientId = event.clientId, deviceId = event.deviceId, IotHubDeviceId = event.IotHubDeviceId, timestamp = event.timestamp, event.telemetry
| mv-expand event_telemetry
| extend telemetryId = event_telemetry.telemetryId, value = event_telemetry.value
| project-away event_telemetry
我将原始遥测数据作为 JSON 记录提取到名为 RawEvents
的单列 table 中,位于名为 Event
的列中。这是 record/event 的样子:
{
"clientId": "myclient1",
"IotHubDeviceId": "myiothubdevice1",
"deviceId": "mydevice1",
"timestamp": "2022-04-12T10:29:00.123",
"telemetry": [
{
"telemetryId: "total.power"
"value": 123.456
},
{
"telemetryId: "temperature"
"value": 34.56
},
...
]
}
RawEvents
table 是这样创建和设置的:
.create table RawEvents (Event: dynamic)
.create table RawEvents ingestion json mapping 'MyRawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
还有Telemetry
table 会用来查询和分析。 Telemetry
table 具有与 RawEvents
table 中的原始数据记录结构相匹配的强类型列。它是这样创建的:
.create table Telemetry (ClientId:string, IotHubDeviceId:string, DeviceId:string, Timestamp:datetime, TelemetryId:string, Value: real)
为了让 Telemetry
table 在新的原始事件被摄取到 RawEvents
时更新记录,我尝试定义一个数据转换函数并使用该函数在将附加到 Telemetry
table.
为此,我使用了以下脚本来验证我的数据转换逻辑是否按预期工作:
datatable (event:dynamic)
[
dynamic(
{
"clientId": "myclient1",
"IotHubDeviceId": "myiothubdevice1",
"deviceId": "mydevice1",
"timestamp": "2022-04-12T10:29:00.123",
"telemetry": [
{
"telemetryId": "total.power",
"value": 123.456
},
{
"telemetryId": "temperature",
"value": 34.56
}
]
}
)
]
| evaluate bag_unpack(event)
| mv-expand telemetry
| evaluate bag_unpack(telemetry)
执行该脚本会得到与 Telemetry
table 结构匹配的所需输出:
clientId deviceId IotHubDeviceId timestamp telemetryId value
myclient1 mydevice1 myiothubdevice1 2022-04-12T10:29:00.123Z total.power 123.456
myclient1 mydevice1 myiothubdevice1 2022-04-12T10:29:00.123Z temperature 34.56
接下来,我创建了一个名为 ExpandTelemetryEvent
的函数,其中包含应用于 RawEvents.Event
:
.create function ExpandTelemetryEvent() {
RawEvents
| evaluate bag_unpack(Event)
| mv-expand telemetry
| evaluate bag_unpack(telemetry)
}
作为最后一步,我尝试为 Telemetry
table 创建一个更新策略,它将使用 RawEvents
作为源并使用 ExpandTelemetryEvent()
作为变换函数:
.alter table Telemetry policy update @'[{"Source": "RawEvents", "Query": "ExpandTelemetryEvent()", "IsEnabled": "True"}]'
这是我收到错误消息的地方
Error during execution of a policy operation: Caught exception while validating query for Update Policy: 'IsEnabled = 'True', Source = 'RawEvents', Query = 'ExpandTelemetryEvent()', IsTransactional = 'False', PropagateIngestionProperties = 'False''. Exception: Request is invalid and cannot be processed: Semantic error: SEM0100: 'mvexpand' operator: Failed to resolve scalar expression named 'telemetry'
我有点理解为什么不能应用该政策。使用示例脚本,数据转换有效,因为有足够的信息可以推断 telemetry
是什么,而在这种情况下,RawEvents.Event
中没有任何内容可以提供有关原始数据结构的信息将存储在 Event
列中的事件。
如何解决?这是正确的方法吗?
如 bag_unpack plugin 文档所示:
The plugin's output schema depends on the data values, making it as "unpredictable" as the data itself. Multiple executions of the plugin, using different data inputs, may produce different output schema.
改用well-defined转换
RawEvent
| project clientId = event.clientId, deviceId = event.deviceId, IotHubDeviceId = event.IotHubDeviceId, timestamp = event.timestamp, event.telemetry
| mv-expand event_telemetry
| extend telemetryId = event_telemetry.telemetryId, value = event_telemetry.value
| project-away event_telemetry