与前一个值的差异。数据库

Difference with the previous value. kslqDB

我试图用 ksqlDB 做一个例子

基于两个属性,在本例中为 "sensor_id" 和 "user_id"。

当两个属性匹配时,我计算了 "value" 字段的变化。

// 卡夫卡主题

{"timestamp": 1000000000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 10000}
{"timestamp": 1555550000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 22000}

{"timestamp": 1666660000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 22000}
{"timestamp": 1777770000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 25000}

{"timestamp": 1666660000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 30000}
{"timestamp": 1777770000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 40000}

当这两个值重合时,我计算了与前一个的差值。

以及经过的时间。

// 结果

{"timestamp": 1555550000, "last_timestamp": 1000000000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 12000}
{"timestamp": 1777770000, "last_timestamp": 1666660000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 3000}
{"timestamp": 1777770000, "last_timestamp": 1666660000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 10000}

使用自定义用户定义的聚合函数或 UDAF 应该可以实现类似的功能。

如果您要编写一个 Compare_to_previous UDAF 来存储它收到的第一个值并将其与第二个值进行比较,那么您可以通过以下方式实现您想要的:

CREATE TABLE DIFF AS SELECT 
   user_id, 
   sensor_id,
   Compare_to_previous(ROWTIME) as last_timestamp,
   Compare_to_previous(value) as value
   GROUP BY user_id, sensor_id
   HAVING Compare_to_previous(ROWTIME) IS NOT NULL;

其中 compare_to_previous 可能类似于(未测试):

@UdafDescription(
    name = "COMPARE_TO_PREVIOUS",
    description = CompareToPrevious.DESCRIPTION
)
public final class CompareToPrevious {

  static final String DESCRIPTION = "Blah blah";

  // UDAF version for BIGINT:
  private static Udaf<Long, List<Long>, Long> compareToPrevious() {
    return new Udaf<Long, List<Long>, Long>() {

      @Override
      public List<Long> initialize() {
        // Initial value is empty() meaning no previous.
        return ImmutableList.of();
      }

      @Override
      public List<Long> aggregate(final Long current, final List<Long> previous) {
        if (current == null) {
          // Ignore null values? You're choice!
          return previous;
        }

        switch (previous.size()) {
          case 0:
            // 1st value seen for this key:
            return ImmutableList.of(current);
          case 1:
            // 2nd value seen for this key:
            return ImmutableList.of(previous.get(0), current);
          default:
            // Already seen two values - what to do?
            // Do you want to compare pairs
            // i.e. compare 2nd with 1st, then 4th with 3rd, etc.
            // return ImmutableList.of()
            // or
            // Or each subsequent row with previous,
            // i.e. you'd compare 2nd with 1st, then 3rd with 2nd etc.
            return ImmutableList.of(previous.get(0), current);
        }
      }

      @Override
      public List<Long> merge(final List<Long> aggOne, final List<Long> aggTwo) {
        throw new UnsupportedOperationException(
            "Compare_to_previous won't work with session windowed data");
      }

      // Call to convert the intermediate List<Long> into an output value.
      @Override
      public Long map(final List<Long> agg) {
        if (agg.size() != 2) {
          // If we don't have two values to compare we output NULL.
          // Use HAVING Compare_To_Previous(COL) IS NOT NULL in the query to exclude these null rows.
          return null;
        }

        // We have two values - calculate the diff:
        return ((long) agg.get(1)) - agg.get(0);
      }
    };
  }
}

这样的聚合器将无法处理乱序数据,但它适用于许多用例。