为什么 ksql 无法识别 min 函数

Why min function is not recognized by ksql

我正在使用 confluent 编写查询以在 5 分钟内获取第一个时间戳 window kafka 主题。这是查询(我知道这不是很好的方法):

CREATE STREAM start_metric_value AS
select metric_value 
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;

但是我有这个错误:

Code generation failed for Predicate: Can't find any functions with the name 'MIN'. expression:(METRIC_DATETIME_UTC = MIN(TIMESTAMPTOSTRING(METRIC_DATETIME_UTC, 'yyyy-MM-dd HH:mm:ss'))), schema:ROWKEY STRING KEY, ID STRING, METRIC_NAME STRING, METRIC_VALUE STRING, METRIC_DATETIME_UTC BIGINT, METRIC_INDEX STRING, IANA_TIMEZONE STRING, PROCESSED_DATETIME_UTC BIGINT, DATA_TYPE STRING, ASSET_TYPE STRING, ROWTIME BIGINT, ROWKEY STRING Caused by: Can't find any functions with the name 'MIN'

谁知道如何解决这个问题

不是 100% 清楚您要实现的目标。请参阅上面关于添加更多详细信息以帮助人们了解您要实现的目标的问题的评论。

也就是说,我可以说....

Min 函数未被识别的原因有两个:

  • 您正在将 TIMESTAMPTOSTRING 的输出传递给 MIN,但 MIN 不接受字符串。
  • 您不能在 WHERE 子句中使用聚合函数。

您看到的错误消息看起来像是一个错误。如果它仍然存在于最新版本的 ksqlDB 上,您可能需要 raise an issue in the ksqlDB GitHub project.

即使更正您查询的这两件事仍然会失败,因为在 ksqlDB 中 windowing 需要聚合,因此您需要 GROUP BY.

例如,如果您想每 5 分钟 metric_value 捕获最小 metric_datetime_utc window,您可以使用:

CREATE TABLE start_metric_value AS
  SELECT
    metric_value,
    MIN(metric_datetime_utc) as minTs
  FROM dataaggregaion 
  WINDOW TUMBLING (SIZE 5 MINUTE)
  GROUP BY metric_value;

这将创建一个 windowed table,即 table,其中密钥由 metric_valueWINDOWSTART 时间组成。 minTs 将存储看到的最小日期时间。

让我们运行通过查询一些数据来了解发生了什么:

输入:

rowtime | metric_value  | metric_datetime_utc
--------|---------------|--------------------
 1      |  A            | 3
 2      |  A            | 4
 3      |  A            | 2
 4      |  B            | 5
 300000 |  A            | 6

START_METRIC_VALUE 主题的输出可能是(注意:metric_Value 和 windowStart 将存储在 Kafka 记录的键中,而 minTs 将存储在值中):

metric_value | windowStart | minTs 
-------------|-------------|------
 A           | 0           | 3
 A           | 0           | 3
 A           | 0           | 2
 B           | 0           | 5
 A           | 300000      | 6

主题实际输出什么将取决于您cache.max.bytes.buffering的值。将其设置为 0,关闭缓冲,将看到上面的输出。然而,启用缓冲后,一些中间结果可能不会输出到 Kafka,尽管每个 window 的最终结果将保持不变。您还可以使用即将到来的 SUPPRESS functionality

控制输出到 Kafka 的内容

上述解决方案为您提供了每个 metric_value 的最小时间戳。如果您想要每个 window 看到的全局最小日期时间,那么您可以 GROUP BY 一个常量。请注意,这会将所有事件路由到单个 ksqlDB 节点,因此它不能作为解决方案很好地扩展。如果缩放是一个问题,则有解决方案,例如就像先计算最小值 metric_value 然后 post-processing 这样找到全局最小值。

CREATE TABLE start_metric_value AS
  SELECT
    1 as Key,
    MIN(metric_datetime_utc) as minTs
  FROM dataaggregaion 
  WINDOW TUMBLING (SIZE 5 MINUTE)
  GROUP BY 1;

注意:ksqlDB 0.10 版的语法是正确的。您可能需要针对其他版本进行调整。