Kapacitor Lambda 平均值
Kapacitor Lambda Mean
我正在尝试使用 Kapacitor batch|query 生成基线,方法是从 InfluxDB 1、2、3 和 4 周前查询相同的时间间隔,然后将其向前移动并像这样连接在一起:
var w1 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(1w).period(period).every(1m).align().groupBy(time(1m))
|shift(1w)
var w2 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(2w).period(period).every(1m).align().groupBy(time(1m))
|shift(2w)
var w3 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(3w).period(period).every(1m).align().groupBy(time(1m))
|shift(3w)
var w4 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(4w).period(period).every(1m).align().groupBy(time(1m))
|shift(4w)
var bj = w1
|join(w2, w3, w3)
.as('w1', 'w2', 'w3', 'w4')
.fill('null')
var b = bj
|eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
.as('avg')
我正在使用 Full Outer Join,因为有些星期可能会缺少一个值,在这种情况下,我会将基线计算为 3 个现值的平均值。
但是,lambda 似乎不支持 Mean() 或任何此类数学函数。它似乎也不支持检查空值。
有没有办法像这样计算基线?
此外,一旦计算出基线,如何才能将其缓存起来,以便可以根据基线检查传入的流数据?
感谢任何帮助!谢谢
首先,尝试在批处理变量上使用偏移量而不是移位。
offset 将从 x 前的分钟、小时、天开始取值...
加入过程中应使用移位节点,例如:
previous
|shift(1w)
|join(current)
......
这里有一个例子:
https://github.com/influxdata/kapacitor/issues/746
关于在不同的时间加入 4 个不同的流,并且由于我之前的评论,我想它行不通...也许使用 union 而不是 join node 可行,但不确定!
你总是可以有 3 个刻度,检查当前到过去一周、2 周等等...
"since some weeks may be missing a value"
- join 将永远等待此值,不会发出其他相应的批次并导致内存泄漏。
|barrier() 节点可能有助于解决泄漏问题,但您在尝试访问缺失点属性时仍然会遇到评估错误。
您想将其分割成多个脚本,例如计算所有 4 个周期的一个,为每个周期添加一些标签,例如
|default().tag('stream', 'w1')
然后将它们发送到
|kapacitorLoopback()
第二个监听环回流的脚本,|window() 所有到达点都没有分组并计算 |mean("mean") 无论它实际得到多少个周期。
我最终使用 C# .NET Core 运行 创建了一个 UDF 作为并行进程,查询 InfluxDB,进行数学运算并缓存结果。
我正在尝试使用 Kapacitor batch|query 生成基线,方法是从 InfluxDB 1、2、3 和 4 周前查询相同的时间间隔,然后将其向前移动并像这样连接在一起:
var w1 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(1w).period(period).every(1m).align().groupBy(time(1m))
|shift(1w)
var w2 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(2w).period(period).every(1m).align().groupBy(time(1m))
|shift(2w)
var w3 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(3w).period(period).every(1m).align().groupBy(time(1m))
|shift(3w)
var w4 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(4w).period(period).every(1m).align().groupBy(time(1m))
|shift(4w)
var bj = w1
|join(w2, w3, w3)
.as('w1', 'w2', 'w3', 'w4')
.fill('null')
var b = bj
|eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
.as('avg')
我正在使用 Full Outer Join,因为有些星期可能会缺少一个值,在这种情况下,我会将基线计算为 3 个现值的平均值。
但是,lambda 似乎不支持 Mean() 或任何此类数学函数。它似乎也不支持检查空值。
有没有办法像这样计算基线?
此外,一旦计算出基线,如何才能将其缓存起来,以便可以根据基线检查传入的流数据?
感谢任何帮助!谢谢
首先,尝试在批处理变量上使用偏移量而不是移位。
offset 将从 x 前的分钟、小时、天开始取值...
加入过程中应使用移位节点,例如:
previous
|shift(1w)
|join(current)
......
这里有一个例子: https://github.com/influxdata/kapacitor/issues/746
关于在不同的时间加入 4 个不同的流,并且由于我之前的评论,我想它行不通...也许使用 union 而不是 join node 可行,但不确定!
你总是可以有 3 个刻度,检查当前到过去一周、2 周等等...
"since some weeks may be missing a value" - join 将永远等待此值,不会发出其他相应的批次并导致内存泄漏。
|barrier() 节点可能有助于解决泄漏问题,但您在尝试访问缺失点属性时仍然会遇到评估错误。
您想将其分割成多个脚本,例如计算所有 4 个周期的一个,为每个周期添加一些标签,例如
|default().tag('stream', 'w1')
然后将它们发送到
|kapacitorLoopback()
第二个监听环回流的脚本,|window() 所有到达点都没有分组并计算 |mean("mean") 无论它实际得到多少个周期。
我最终使用 C# .NET Core 运行 创建了一个 UDF 作为并行进程,查询 InfluxDB,进行数学运算并缓存结果。