与前一个值的差异。数据库
Difference with the previous value. kslqDB
我试图用 ksqlDB 做一个例子
基于两个属性,在本例中为 "sensor_id" 和 "user_id"。
当两个属性匹配时,我计算了 "value" 字段的变化。
// 卡夫卡主题
{"timestamp": 1000000000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 10000}
{"timestamp": 1555550000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 22000}
{"timestamp": 1666660000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 22000}
{"timestamp": 1777770000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 25000}
{"timestamp": 1666660000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 30000}
{"timestamp": 1777770000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 40000}
当这两个值重合时,我计算了与前一个的差值。
以及经过的时间。
// 结果
{"timestamp": 1555550000, "last_timestamp": 1000000000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 12000}
{"timestamp": 1777770000, "last_timestamp": 1666660000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 3000}
{"timestamp": 1777770000, "last_timestamp": 1666660000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 10000}
使用自定义用户定义的聚合函数或 UDAF 应该可以实现类似的功能。
如果您要编写一个 Compare_to_previous
UDAF 来存储它收到的第一个值并将其与第二个值进行比较,那么您可以通过以下方式实现您想要的:
CREATE TABLE DIFF AS SELECT
user_id,
sensor_id,
Compare_to_previous(ROWTIME) as last_timestamp,
Compare_to_previous(value) as value
GROUP BY user_id, sensor_id
HAVING Compare_to_previous(ROWTIME) IS NOT NULL;
其中 compare_to_previous 可能类似于(未测试):
@UdafDescription(
name = "COMPARE_TO_PREVIOUS",
description = CompareToPrevious.DESCRIPTION
)
public final class CompareToPrevious {
static final String DESCRIPTION = "Blah blah";
// UDAF version for BIGINT:
private static Udaf<Long, List<Long>, Long> compareToPrevious() {
return new Udaf<Long, List<Long>, Long>() {
@Override
public List<Long> initialize() {
// Initial value is empty() meaning no previous.
return ImmutableList.of();
}
@Override
public List<Long> aggregate(final Long current, final List<Long> previous) {
if (current == null) {
// Ignore null values? You're choice!
return previous;
}
switch (previous.size()) {
case 0:
// 1st value seen for this key:
return ImmutableList.of(current);
case 1:
// 2nd value seen for this key:
return ImmutableList.of(previous.get(0), current);
default:
// Already seen two values - what to do?
// Do you want to compare pairs
// i.e. compare 2nd with 1st, then 4th with 3rd, etc.
// return ImmutableList.of()
// or
// Or each subsequent row with previous,
// i.e. you'd compare 2nd with 1st, then 3rd with 2nd etc.
return ImmutableList.of(previous.get(0), current);
}
}
@Override
public List<Long> merge(final List<Long> aggOne, final List<Long> aggTwo) {
throw new UnsupportedOperationException(
"Compare_to_previous won't work with session windowed data");
}
// Call to convert the intermediate List<Long> into an output value.
@Override
public Long map(final List<Long> agg) {
if (agg.size() != 2) {
// If we don't have two values to compare we output NULL.
// Use HAVING Compare_To_Previous(COL) IS NOT NULL in the query to exclude these null rows.
return null;
}
// We have two values - calculate the diff:
return ((long) agg.get(1)) - agg.get(0);
}
};
}
}
这样的聚合器将无法处理乱序数据,但它适用于许多用例。
我试图用 ksqlDB 做一个例子
基于两个属性,在本例中为 "sensor_id" 和 "user_id"。
当两个属性匹配时,我计算了 "value" 字段的变化。
// 卡夫卡主题
{"timestamp": 1000000000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 10000}
{"timestamp": 1555550000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 22000}
{"timestamp": 1666660000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 22000}
{"timestamp": 1777770000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 25000}
{"timestamp": 1666660000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 30000}
{"timestamp": 1777770000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 40000}
当这两个值重合时,我计算了与前一个的差值。
以及经过的时间。
// 结果
{"timestamp": 1555550000, "last_timestamp": 1000000000, "user_id": "AAAAA", "sensor_id": "sensor_1111", "value": 12000}
{"timestamp": 1777770000, "last_timestamp": 1666660000, "user_id": "AAAAA", "sensor_id": "sensor_2222", "value": 3000}
{"timestamp": 1777770000, "last_timestamp": 1666660000, "user_id": "BBBBB", "sensor_id": "sensor_2222", "value": 10000}
使用自定义用户定义的聚合函数或 UDAF 应该可以实现类似的功能。
如果您要编写一个 Compare_to_previous
UDAF 来存储它收到的第一个值并将其与第二个值进行比较,那么您可以通过以下方式实现您想要的:
CREATE TABLE DIFF AS SELECT
user_id,
sensor_id,
Compare_to_previous(ROWTIME) as last_timestamp,
Compare_to_previous(value) as value
GROUP BY user_id, sensor_id
HAVING Compare_to_previous(ROWTIME) IS NOT NULL;
其中 compare_to_previous 可能类似于(未测试):
@UdafDescription(
name = "COMPARE_TO_PREVIOUS",
description = CompareToPrevious.DESCRIPTION
)
public final class CompareToPrevious {
static final String DESCRIPTION = "Blah blah";
// UDAF version for BIGINT:
private static Udaf<Long, List<Long>, Long> compareToPrevious() {
return new Udaf<Long, List<Long>, Long>() {
@Override
public List<Long> initialize() {
// Initial value is empty() meaning no previous.
return ImmutableList.of();
}
@Override
public List<Long> aggregate(final Long current, final List<Long> previous) {
if (current == null) {
// Ignore null values? You're choice!
return previous;
}
switch (previous.size()) {
case 0:
// 1st value seen for this key:
return ImmutableList.of(current);
case 1:
// 2nd value seen for this key:
return ImmutableList.of(previous.get(0), current);
default:
// Already seen two values - what to do?
// Do you want to compare pairs
// i.e. compare 2nd with 1st, then 4th with 3rd, etc.
// return ImmutableList.of()
// or
// Or each subsequent row with previous,
// i.e. you'd compare 2nd with 1st, then 3rd with 2nd etc.
return ImmutableList.of(previous.get(0), current);
}
}
@Override
public List<Long> merge(final List<Long> aggOne, final List<Long> aggTwo) {
throw new UnsupportedOperationException(
"Compare_to_previous won't work with session windowed data");
}
// Call to convert the intermediate List<Long> into an output value.
@Override
public Long map(final List<Long> agg) {
if (agg.size() != 2) {
// If we don't have two values to compare we output NULL.
// Use HAVING Compare_To_Previous(COL) IS NOT NULL in the query to exclude these null rows.
return null;
}
// We have two values - calculate the diff:
return ((long) agg.get(1)) - agg.get(0);
}
};
}
}
这样的聚合器将无法处理乱序数据,但它适用于许多用例。