如何转换为动态类型/在 KQL/Kusto 中的相同 'pack' 上应用多个函数
How to convert to dynamic type/ apply multiple functions on same 'pack' in KQL/Kusto
我非常喜欢 ADX 时间序列功能;使用 Python 处理大量传感器数据。以下是我的案例的要求:
- 以不同的频率处理传感器数据标签——将它们全部带到 1 秒的频率(如果以毫秒为单位,则在 1 秒的间隔内聚合)
- 将堆叠数据转换为非堆叠数据。
- 在取消堆叠后,按时间戳加入另一个具有多个“字符串标签”的数据集。
- 对某些列进行线性插值,并向前填充其他列(总共大约 10-12 个)。
我认为通过以下查询我已经完成了前三个;但无法直接在列上使用 series_fill_linear
。文档说此函数需要 dynamic
类型作为输入。错误消息很有帮助:
series_fill_linear(): argument #1 was not of an expected data type: dynamic
是否可以在我已经使用 pack
而不是再次使用 pack
的地方应用 series_fill_linear
。如何通过 Tag 有选择地应用此功能;并使我的整体查询更具可读性?重要的是要注意,只有 sensor_data
table 需要同时 series_fill_linear
和 series_fill_forward
; label_data
只需要 series_fill_forward
.
列表项
sensor_data
| where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
| where device_number =='PRESSURE_599'
| where tag_name in ("tag1", "tag2", "tag3", "tag4")
| make-series agg_value = avg(value) default = double(null) on timestamp in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s) by tag_name
| extend series_fill_linear(agg_value, double(null), false) //EDIT
| mv-expand timestamp to typeof(datetime), agg_value to typeof(double)
| summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
| evaluate bag_unpack(b)
|join kind = leftouter (label_data
| where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
| where device_number =='PRESSURE_599'
| where tag != "PRESSURE_599_label_Raw"
| summarize x = make_bag(pack(tag, value)) by timestamp
| evaluate bag_unpack(x)) on timestamp
| project timestamp,
MY_LINEAR_COL_1 = series_fill_linear(tag1, double(null), false),
MY_LINEAR_COL_2 = series_fill_forward(tag2),
MY_LABEL_1 = series_fill_forward(PRESSURE_599_label_level1),
MY_LABEL_2 = series_fill_forward(PRESSURE_599_label_level2)
编辑:我最终使用 extend
和 case
来处理不同的插值情况。
// 让 forward_tags = dynamic({"tags": ["tag2","tag4"]});无法在查询中将其用作“forward_tags.tags”
sensor_data
| where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
| where device_number = "PRESSURE_599"
| where tag_name in ("tag1", "tag2", "tag3", "tag4") // use a variable here instead?
| make-series agg_value = avg(value)
default = double(null)
on timestamp
in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s)
by tag_name
| extend agg_value = case (tag_name in ("tag2", "tag3"), // use a variable here instead?
series_fill_forward(agg_value, double(null)),
series_fill_linear(agg_value, double(null), false)
)
| mv-expand timestamp to typeof(datetime), agg_value to typeof(double)
| summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
| evaluate bag_unpack(b)
| join kind = leftouter (
label_data // don't want to use make-series here, will be unecessary data generation since already in 'ss' format.
| where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
| where tag != "PRESSURE_599_label_Raw"
| summarize x = make_bag(pack(tag, value)) by timestamp
| evaluate bag_unpack(x)
)
on timestamp
我想知道是否可以在 KQL
中将 list of strings
传递给 query/fxn 以供使用,如下所示。我在我认为可以传递 list of strings
以使代码更具可读性的地方发表了评论。
现在,我只需要 fill_forward
标签列 (MY_LABEL_1, MY_LABEL_2
);这是以下查询的结果。我希望将代码添加到主查询中,最终结果是包含所有列的 table;这是基于我的案例结果的示例 table。
datatable (timestamp:datetime, tag1:double, tag2:double, tag3:double, tag4:double, MY_LABEL_1: string, MY_LABEL_2: string)
[
datetime(2020-11-24T00:01:00Z), 1, 3, 6, 9, "x", "foo",
datetime(2020-11-24T00:01:01Z), 1, 3, 6, 9, "", "",
datetime(2020-11-24T00:01:02Z), 1, 3, 6, 9,"", "",
datetime(2020-11-24T00:01:03Z), 1, 3, 6, 9,"y", "bar",
datetime(2020-11-24T00:01:04Z), 1, 3, 6, 9,"", "",
datetime(2020-11-24T00:01:05Z), 1, 3, 6, 9,"", "",
]
ADX 中的系列函数仅适用于动态数组。您可以使用 case() 函数应用选择性填充函数,方法是替换此行:
| extend series_fill_linear(agg_value, double(null), false) //EDIT
使用如下内容:
| extend agg_value = case(
tag_name == "tag1", series_fill_linear(agg_value, double(null), false),
tag_name == "tag2", series_fill_forward(agg_value),
series_fill_forward(agg_value)
)
编辑:
以下是字符串列前向填充解决方法的示例:
let T = datatable ( Timestamp: datetime, Employee: string )
[ datetime(2020-01-01), "Bob",
datetime(2021-01-02), "",
datetime(2021-01-03), "Alice",
datetime(2021-01-04), "",
datetime(2021-01-05), "",
datetime(2021-01-06), "Alan",
datetime(2021-01-07), "",
datetime(2021-01-08), "" ]
| sort by Timestamp asc;
let employeeLookup = toscalar(T | where isnotempty(Employee) | summarize make_list(Employee));
T
| extend idx = row_cumsum(tolong(isnotempty(Employee)))
| extend EmployeeFilled = employeeLookup[idx - 1]
| project-away idx
Timestamp
Employee
EmployeeFilled
2021-01-01 00:00:00.0000000
Bob
Bob
2021-01-02 00:00:00.0000000
Bob
2021-01-03 00:00:00.0000000
Alice
Alice
2021-01-04 00:00:00.0000000
Alice
2021-01-05 00:00:00.0000000
Alice
2021-01-06 00:00:00.0000000
Alan
Alan
2021-01-07 00:00:00.0000000
Alan
2021-01-08 00:00:00.0000000
Alan
关于将多频率时间序列转换为通用时间序列的需求,请查看series_downsample_fl()函数库
我非常喜欢 ADX 时间序列功能;使用 Python 处理大量传感器数据。以下是我的案例的要求:
- 以不同的频率处理传感器数据标签——将它们全部带到 1 秒的频率(如果以毫秒为单位,则在 1 秒的间隔内聚合)
- 将堆叠数据转换为非堆叠数据。
- 在取消堆叠后,按时间戳加入另一个具有多个“字符串标签”的数据集。
- 对某些列进行线性插值,并向前填充其他列(总共大约 10-12 个)。
我认为通过以下查询我已经完成了前三个;但无法直接在列上使用 series_fill_linear
。文档说此函数需要 dynamic
类型作为输入。错误消息很有帮助:
series_fill_linear(): argument #1 was not of an expected data type: dynamic
是否可以在我已经使用 pack
而不是再次使用 pack
的地方应用 series_fill_linear
。如何通过 Tag 有选择地应用此功能;并使我的整体查询更具可读性?重要的是要注意,只有 sensor_data
table 需要同时 series_fill_linear
和 series_fill_forward
; label_data
只需要 series_fill_forward
.
列表项
sensor_data
| where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
| where device_number =='PRESSURE_599'
| where tag_name in ("tag1", "tag2", "tag3", "tag4")
| make-series agg_value = avg(value) default = double(null) on timestamp in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s) by tag_name
| extend series_fill_linear(agg_value, double(null), false) //EDIT
| mv-expand timestamp to typeof(datetime), agg_value to typeof(double)
| summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
| evaluate bag_unpack(b)
|join kind = leftouter (label_data
| where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
| where device_number =='PRESSURE_599'
| where tag != "PRESSURE_599_label_Raw"
| summarize x = make_bag(pack(tag, value)) by timestamp
| evaluate bag_unpack(x)) on timestamp
| project timestamp,
MY_LINEAR_COL_1 = series_fill_linear(tag1, double(null), false),
MY_LINEAR_COL_2 = series_fill_forward(tag2),
MY_LABEL_1 = series_fill_forward(PRESSURE_599_label_level1),
MY_LABEL_2 = series_fill_forward(PRESSURE_599_label_level2)
编辑:我最终使用 extend
和 case
来处理不同的插值情况。
// 让 forward_tags = dynamic({"tags": ["tag2","tag4"]});无法在查询中将其用作“forward_tags.tags”
sensor_data
| where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
| where device_number = "PRESSURE_599"
| where tag_name in ("tag1", "tag2", "tag3", "tag4") // use a variable here instead?
| make-series agg_value = avg(value)
default = double(null)
on timestamp
in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s)
by tag_name
| extend agg_value = case (tag_name in ("tag2", "tag3"), // use a variable here instead?
series_fill_forward(agg_value, double(null)),
series_fill_linear(agg_value, double(null), false)
)
| mv-expand timestamp to typeof(datetime), agg_value to typeof(double)
| summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
| evaluate bag_unpack(b)
| join kind = leftouter (
label_data // don't want to use make-series here, will be unecessary data generation since already in 'ss' format.
| where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
| where tag != "PRESSURE_599_label_Raw"
| summarize x = make_bag(pack(tag, value)) by timestamp
| evaluate bag_unpack(x)
)
on timestamp
我想知道是否可以在 KQL
中将 list of strings
传递给 query/fxn 以供使用,如下所示。我在我认为可以传递 list of strings
以使代码更具可读性的地方发表了评论。
现在,我只需要 fill_forward
标签列 (MY_LABEL_1, MY_LABEL_2
);这是以下查询的结果。我希望将代码添加到主查询中,最终结果是包含所有列的 table;这是基于我的案例结果的示例 table。
datatable (timestamp:datetime, tag1:double, tag2:double, tag3:double, tag4:double, MY_LABEL_1: string, MY_LABEL_2: string)
[
datetime(2020-11-24T00:01:00Z), 1, 3, 6, 9, "x", "foo",
datetime(2020-11-24T00:01:01Z), 1, 3, 6, 9, "", "",
datetime(2020-11-24T00:01:02Z), 1, 3, 6, 9,"", "",
datetime(2020-11-24T00:01:03Z), 1, 3, 6, 9,"y", "bar",
datetime(2020-11-24T00:01:04Z), 1, 3, 6, 9,"", "",
datetime(2020-11-24T00:01:05Z), 1, 3, 6, 9,"", "",
]
ADX 中的系列函数仅适用于动态数组。您可以使用 case() 函数应用选择性填充函数,方法是替换此行:
| extend series_fill_linear(agg_value, double(null), false) //EDIT
使用如下内容:
| extend agg_value = case(
tag_name == "tag1", series_fill_linear(agg_value, double(null), false),
tag_name == "tag2", series_fill_forward(agg_value),
series_fill_forward(agg_value)
)
编辑:
以下是字符串列前向填充解决方法的示例:
let T = datatable ( Timestamp: datetime, Employee: string )
[ datetime(2020-01-01), "Bob",
datetime(2021-01-02), "",
datetime(2021-01-03), "Alice",
datetime(2021-01-04), "",
datetime(2021-01-05), "",
datetime(2021-01-06), "Alan",
datetime(2021-01-07), "",
datetime(2021-01-08), "" ]
| sort by Timestamp asc;
let employeeLookup = toscalar(T | where isnotempty(Employee) | summarize make_list(Employee));
T
| extend idx = row_cumsum(tolong(isnotempty(Employee)))
| extend EmployeeFilled = employeeLookup[idx - 1]
| project-away idx
Timestamp | Employee | EmployeeFilled |
---|---|---|
2021-01-01 00:00:00.0000000 | Bob | Bob |
2021-01-02 00:00:00.0000000 | Bob | |
2021-01-03 00:00:00.0000000 | Alice | Alice |
2021-01-04 00:00:00.0000000 | Alice | |
2021-01-05 00:00:00.0000000 | Alice | |
2021-01-06 00:00:00.0000000 | Alan | Alan |
2021-01-07 00:00:00.0000000 | Alan | |
2021-01-08 00:00:00.0000000 | Alan |
关于将多频率时间序列转换为通用时间序列的需求,请查看series_downsample_fl()函数库