如何转换为动态类型/在 KQL/Kusto 中的相同 'pack' 上应用多个函数

How to convert to dynamic type/ apply multiple functions on same 'pack' in KQL/Kusto

我非常喜欢 ADX 时间序列功能;使用 Python 处理大量传感器数据。以下是我的案例的要求:

  1. 以不同的频率处理传感器数据标签——将它们全部带到 1 秒的频率(如果以毫秒为单位,则在 1 秒的间隔内聚合)
  2. 将堆叠数据转换为非堆叠数据。
  3. 在取消堆叠后,按时间戳加入另一个具有多个“字符串标签”的数据集。
  4. 对某些列进行线性插值,并向前填充其他列(总共大约 10-12 个)。

我认为通过以下查询我已经完成了前三个;但无法直接在列上使用 series_fill_linear。文档说此函数需要 dynamic 类型作为输入。错误消息很有帮助: series_fill_linear(): argument #1 was not of an expected data type: dynamic

是否可以在我已经使用 pack 而不是再次使用 pack 的地方应用 series_fill_linear。如何通过 Tag 有选择地应用此功能;并使我的整体查询更具可读性?重要的是要注意,只有 sensor_data table 需要同时 series_fill_linearseries_fill_forwardlabel_data 只需要 series_fill_forward.

列表项

sensor_data
    | where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00) 
    | where device_number =='PRESSURE_599' 
    | where tag_name in ("tag1", "tag2", "tag3",  "tag4") 
    | make-series agg_value = avg(value) default = double(null) on timestamp in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s) by tag_name
    | extend series_fill_linear(agg_value, double(null), false) //EDIT
    | mv-expand timestamp to typeof(datetime), agg_value to typeof(double) 
    | summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
    | evaluate bag_unpack(b)
|join kind = leftouter (label_data
    | where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01) 
    | where device_number =='PRESSURE_599'
    | where tag != "PRESSURE_599_label_Raw" 
    | summarize x = make_bag(pack(tag, value)) by timestamp
    | evaluate bag_unpack(x)) on timestamp
    | project timestamp, 
              MY_LINEAR_COL_1 = series_fill_linear(tag1, double(null), false),
              MY_LINEAR_COL_2 = series_fill_forward(tag2),
              MY_LABEL_1 = series_fill_forward(PRESSURE_599_label_level1),
              MY_LABEL_2 = series_fill_forward(PRESSURE_599_label_level2)

编辑:我最终使用 extendcase 来处理不同的插值情况。

// 让 forward_tags = dynamic({"tags": ["tag2","tag4"]});无法在查询中将其用作“forward_tags.tags”

sensor_data
    | where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
    | where device_number = "PRESSURE_599"
    | where tag_name in ("tag1", "tag2", "tag3", "tag4") // use a variable here instead?
    | make-series agg_value = avg(value) 
                              default = double(null) 
                              on timestamp
                              in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s)
                              by tag_name
    | extend agg_value = case (tag_name in ("tag2", "tag3"), // use a variable here instead?
                                series_fill_forward(agg_value, double(null)),
                                series_fill_linear(agg_value, double(null), false)
                                )
    | mv-expand timestamp to typeof(datetime), agg_value to typeof(double) 
    | summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
    | evaluate bag_unpack(b)
| join kind = leftouter (  
  label_data // don't want to use make-series here, will be unecessary data generation since already in 'ss' format.
    | where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
    | where tag != "PRESSURE_599_label_Raw" 
    | summarize x = make_bag(pack(tag, value)) by timestamp
    | evaluate bag_unpack(x)
    ) 
on timestamp

我想知道是否可以在 KQL 中将 list of strings 传递给 query/fxn 以供使用,如下所示。我在我认为可以传递 list of strings 以使代码更具可读性的地方发表了评论。

现在,我只需要 fill_forward 标签列 (MY_LABEL_1, MY_LABEL_2);这是以下查询的结果。我希望将代码添加到主查询中,最终结果是包含所有列的 table;这是基于我的案例结果的示例 table。

datatable (timestamp:datetime, tag1:double, tag2:double, tag3:double, tag4:double, MY_LABEL_1: string, MY_LABEL_2: string)
    [
     datetime(2020-11-24T00:01:00Z), 1, 3, 6, 9, "x", "foo",
     datetime(2020-11-24T00:01:01Z), 1, 3, 6, 9, "", "",
     datetime(2020-11-24T00:01:02Z), 1, 3, 6, 9,"", "",
     datetime(2020-11-24T00:01:03Z), 1, 3, 6, 9,"y", "bar",
     datetime(2020-11-24T00:01:04Z), 1, 3, 6, 9,"", "",
     datetime(2020-11-24T00:01:05Z), 1, 3, 6, 9,"", "",
     ]

ADX 中的系列函数仅适用于动态数组。您可以使用 case() 函数应用选择性填充函数,方法是替换此行:

| extend series_fill_linear(agg_value, double(null), false) //EDIT

使用如下内容:

| extend agg_value = case(
        tag_name == "tag1", series_fill_linear(agg_value, double(null), false),
        tag_name == "tag2", series_fill_forward(agg_value),
        series_fill_forward(agg_value)
  )

编辑:
以下是字符串列前向填充解决方法的示例:

let T = datatable ( Timestamp: datetime, Employee: string ) 
[   datetime(2020-01-01), "Bob",
datetime(2021-01-02), "",
datetime(2021-01-03), "Alice",
datetime(2021-01-04), "",
datetime(2021-01-05), "",
datetime(2021-01-06), "Alan",
datetime(2021-01-07), "",
datetime(2021-01-08), ""  ]
| sort by Timestamp asc;
let employeeLookup = toscalar(T | where isnotempty(Employee) | summarize make_list(Employee));
T
| extend idx = row_cumsum(tolong(isnotempty(Employee)))
| extend EmployeeFilled = employeeLookup[idx - 1]
| project-away idx
Timestamp Employee EmployeeFilled
2021-01-01 00:00:00.0000000 Bob Bob
2021-01-02 00:00:00.0000000 Bob
2021-01-03 00:00:00.0000000 Alice Alice
2021-01-04 00:00:00.0000000 Alice
2021-01-05 00:00:00.0000000 Alice
2021-01-06 00:00:00.0000000 Alan Alan
2021-01-07 00:00:00.0000000 Alan
2021-01-08 00:00:00.0000000 Alan

关于将多频率时间序列转换为通用时间序列的需求,请查看series_downsample_fl()函数库