Kapacitor Lambda 平均值

Kapacitor Lambda Mean

我正在尝试使用 Kapacitor batch|query 生成基线,方法是从 InfluxDB 1、2、3 和 4 周前查询相同的时间间隔,然后将其向前移动并像这样连接在一起:

var w1 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(1w).period(period).every(1m).align().groupBy(time(1m))
    |shift(1w)

var w2 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(2w).period(period).every(1m).align().groupBy(time(1m))
    |shift(2w)

var w3 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(3w).period(period).every(1m).align().groupBy(time(1m))
    |shift(3w)

var w4 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(4w).period(period).every(1m).align().groupBy(time(1m))
    |shift(4w)

var bj = w1
    |join(w2, w3, w3)
        .as('w1', 'w2', 'w3', 'w4')
        .fill('null')

var b = bj
    |eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
        .as('avg')

我正在使用 Full Outer Join,因为有些星期可能会缺少一个值,在这种情况下,我会将基线计算为 3 个现值的平均值。

但是,lambda 似乎不支持 Mean() 或任何此类数学函数。它似乎也不支持检查空值。

有没有办法像这样计算基线?

此外,一旦计算出基线,如何才能将其缓存起来,以便可以根据基线检查传入的流数据?

感谢任何帮助!谢谢

首先,尝试在批处理变量上使用偏移量而不是移位。

offset 将从 x 前的分钟、小时、天开始取值...

加入过程中应使用移位节点,例如:

previous
    |shift(1w)
    |join(current)
    ......

这里有一个例子: https://github.com/influxdata/kapacitor/issues/746

关于在不同的时间加入 4 个不同的流,并且由于我之前的评论,我想它行不通...也许使用 union 而不是 join node 可行,但不确定!

你总是可以有 3 个刻度,检查当前到过去一周、2 周等等...

"since some weeks may be missing a value" - join 将永远等待此值,不会发出其他相应的批次并导致内存泄漏。

|barrier() 节点可能有助于解决泄漏问题,但您在尝试访问缺失点属性时仍然会遇到评估错误。

您想将其分割成多个脚本,例如计算所有 4 个周期的一个,为每个周期添加一些标签,例如

|default().tag('stream', 'w1')

然后将它们发送到

|kapacitorLoopback()

第二个监听环回流的脚本,|window() 所有到达点都没有分组并计算 |mean("mean") 无论它实际得到多少个周期。

我最终使用 C# .NET Core 运行 创建了一个 UDF 作为并行进程,查询 InfluxDB,进行数学运算并缓存结果。