为什么 ksql 无法识别 min 函数
Why min function is not recognized by ksql
我正在使用 confluent 编写查询以在 5 分钟内获取第一个时间戳 window kafka 主题。这是查询(我知道这不是很好的方法):
CREATE STREAM start_metric_value AS
select metric_value
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;
但是我有这个错误:
Code generation failed for Predicate: Can't find any functions with
the name 'MIN'. expression:(METRIC_DATETIME_UTC =
MIN(TIMESTAMPTOSTRING(METRIC_DATETIME_UTC, 'yyyy-MM-dd HH:mm:ss'))),
schema:ROWKEY
STRING KEY, ID
STRING, METRIC_NAME
STRING,
METRIC_VALUE
STRING, METRIC_DATETIME_UTC
BIGINT, METRIC_INDEX
STRING, IANA_TIMEZONE
STRING, PROCESSED_DATETIME_UTC
BIGINT,
DATA_TYPE
STRING, ASSET_TYPE
STRING, ROWTIME
BIGINT, ROWKEY
STRING Caused by: Can't find any functions with the name 'MIN'
谁知道如何解决这个问题
不是 100% 清楚您要实现的目标。请参阅上面关于添加更多详细信息以帮助人们了解您要实现的目标的问题的评论。
也就是说,我可以说....
Min
函数未被识别的原因有两个:
- 您正在将
TIMESTAMPTOSTRING
的输出传递给 MIN
,但 MIN
不接受字符串。
- 您不能在
WHERE
子句中使用聚合函数。
您看到的错误消息看起来像是一个错误。如果它仍然存在于最新版本的 ksqlDB 上,您可能需要 raise an issue in the ksqlDB GitHub project.
即使更正您查询的这两件事仍然会失败,因为在 ksqlDB 中 windowing 需要聚合,因此您需要 GROUP BY
.
例如,如果您想每 5 分钟 metric_value
捕获最小 metric_datetime_utc
window,您可以使用:
CREATE TABLE start_metric_value AS
SELECT
metric_value,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY metric_value;
这将创建一个 windowed table,即 table,其中密钥由 metric_value
和 WINDOWSTART
时间组成。 minTs
将存储看到的最小日期时间。
让我们运行通过查询一些数据来了解发生了什么:
输入:
rowtime | metric_value | metric_datetime_utc
--------|---------------|--------------------
1 | A | 3
2 | A | 4
3 | A | 2
4 | B | 5
300000 | A | 6
START_METRIC_VALUE
主题的输出可能是(注意:metric_Value 和 windowStart 将存储在 Kafka 记录的键中,而 minTs 将存储在值中):
metric_value | windowStart | minTs
-------------|-------------|------
A | 0 | 3
A | 0 | 3
A | 0 | 2
B | 0 | 5
A | 300000 | 6
主题实际输出什么将取决于您cache.max.bytes.buffering
的值。将其设置为 0
,关闭缓冲,将看到上面的输出。然而,启用缓冲后,一些中间结果可能不会输出到 Kafka,尽管每个 window 的最终结果将保持不变。您还可以使用即将到来的 SUPPRESS functionality
控制输出到 Kafka 的内容
上述解决方案为您提供了每个 metric_value 的最小时间戳。如果您想要每个 window 看到的全局最小日期时间,那么您可以 GROUP BY
一个常量。请注意,这会将所有事件路由到单个 ksqlDB 节点,因此它不能作为解决方案很好地扩展。如果缩放是一个问题,则有解决方案,例如就像先计算最小值 metric_value
然后 post-processing 这样找到全局最小值。
CREATE TABLE start_metric_value AS
SELECT
1 as Key,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY 1;
注意:ksqlDB 0.10 版的语法是正确的。您可能需要针对其他版本进行调整。
我正在使用 confluent 编写查询以在 5 分钟内获取第一个时间戳 window kafka 主题。这是查询(我知道这不是很好的方法):
CREATE STREAM start_metric_value AS
select metric_value
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;
但是我有这个错误:
Code generation failed for Predicate: Can't find any functions with the name 'MIN'. expression:(METRIC_DATETIME_UTC = MIN(TIMESTAMPTOSTRING(METRIC_DATETIME_UTC, 'yyyy-MM-dd HH:mm:ss'))), schema:
ROWKEY
STRING KEY,ID
STRING,METRIC_NAME
STRING,METRIC_VALUE
STRING,METRIC_DATETIME_UTC
BIGINT,METRIC_INDEX
STRING,IANA_TIMEZONE
STRING,PROCESSED_DATETIME_UTC
BIGINT,DATA_TYPE
STRING,ASSET_TYPE
STRING,ROWTIME
BIGINT,ROWKEY
STRING Caused by: Can't find any functions with the name 'MIN'
谁知道如何解决这个问题
不是 100% 清楚您要实现的目标。请参阅上面关于添加更多详细信息以帮助人们了解您要实现的目标的问题的评论。
也就是说,我可以说....
Min
函数未被识别的原因有两个:
- 您正在将
TIMESTAMPTOSTRING
的输出传递给MIN
,但MIN
不接受字符串。 - 您不能在
WHERE
子句中使用聚合函数。
您看到的错误消息看起来像是一个错误。如果它仍然存在于最新版本的 ksqlDB 上,您可能需要 raise an issue in the ksqlDB GitHub project.
即使更正您查询的这两件事仍然会失败,因为在 ksqlDB 中 windowing 需要聚合,因此您需要 GROUP BY
.
例如,如果您想每 5 分钟 metric_value
捕获最小 metric_datetime_utc
window,您可以使用:
CREATE TABLE start_metric_value AS
SELECT
metric_value,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY metric_value;
这将创建一个 windowed table,即 table,其中密钥由 metric_value
和 WINDOWSTART
时间组成。 minTs
将存储看到的最小日期时间。
让我们运行通过查询一些数据来了解发生了什么:
输入:
rowtime | metric_value | metric_datetime_utc
--------|---------------|--------------------
1 | A | 3
2 | A | 4
3 | A | 2
4 | B | 5
300000 | A | 6
START_METRIC_VALUE
主题的输出可能是(注意:metric_Value 和 windowStart 将存储在 Kafka 记录的键中,而 minTs 将存储在值中):
metric_value | windowStart | minTs
-------------|-------------|------
A | 0 | 3
A | 0 | 3
A | 0 | 2
B | 0 | 5
A | 300000 | 6
主题实际输出什么将取决于您cache.max.bytes.buffering
的值。将其设置为 0
,关闭缓冲,将看到上面的输出。然而,启用缓冲后,一些中间结果可能不会输出到 Kafka,尽管每个 window 的最终结果将保持不变。您还可以使用即将到来的 SUPPRESS functionality
上述解决方案为您提供了每个 metric_value 的最小时间戳。如果您想要每个 window 看到的全局最小日期时间,那么您可以 GROUP BY
一个常量。请注意,这会将所有事件路由到单个 ksqlDB 节点,因此它不能作为解决方案很好地扩展。如果缩放是一个问题,则有解决方案,例如就像先计算最小值 metric_value
然后 post-processing 这样找到全局最小值。
CREATE TABLE start_metric_value AS
SELECT
1 as Key,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY 1;
注意:ksqlDB 0.10 版的语法是正确的。您可能需要针对其他版本进行调整。