Flink 中使用定义的聚合函数 - 找不到函数签名的匹配项
Use-Defined Aggregate Function in Flink - No match found for function signature
我想保留 Flink 中“Select .. From .. GROUP BY ..”查询中每个键的所有原始行。我定义了一个名为 RowToJsonAgg 的 AggregateFunction,它将行聚合成一个 Json 字符串。
class RowToJsonAgg extends AggregateFunction[String, ListBuffer[String]]{
def accumulate(accumulator: ListBuffer[String], row: Any*): Unit = {
....
// 假设该行看起来像“$field1_name, $field1_value, $field2_name, $field2_value, ...”
// 尝试从行中生成 json。然而,当我 运行 查询时,Flink 似乎找不到这个函数
}
def merge(accumulator: ListBuffer[String], its: java.lang.Iterable[ListBuffer[String]]): Unit = {
accumulator.append(
WrapAsScala.iterableAsScalaIterable(its).flatten.toList:_*
)
}
def resetAccumulator(accumulator: ListBuffer[String]): Unit = {
accumulator.clear()
}
override def getValue(accumulator: ListBuffer[String]): String = {
accumulator.mkString("{", ",", "}")
}
override def createAccumulator(): ListBuffer[String] = ListBuffer.empty
override def getAccumulatorType(): TypeInformation[ListBuffer[String]] = {
TypeInformation.of(classOf[ListBuffer[String]])
}
override def getResultType: TypeInformation[String] = TypeInformation.of(classOf[String])
}
数据 class 和查询如下所示:
case class Stock(id:Int, price: Int, volumn: Int, ts: Long)
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
val table = bbTableEnv.fromValues(...)
bbTableEnv.createTemporaryView("Stock", table)
bbTableEnv.executeSql(
"select price, row_to_json_agg('volumn', volumn, 'ts', ts) as details from Stock group by price"
)
当我 运行 应用程序时,我得到 SQL 验证异常,详细消息是“找不到函数签名的匹配项 row_to_json_agg(字符、数字、字符、数字)"
Flink 似乎找不到合适的 accumulate 函数来调用。
如果我声明累加函数如下
def accumulate(accumulator: ListBuffer[String], volumn: Integer, ts: Long)
并将查询更改为
"select price, row_to_json_agg(volumn, ts) from Stock group by price"
我遇到了同样的异常,消息是“没有找到函数签名的匹配项row_to_json_agg(NUMERIC, NUMERIC)
关于如何使聚合函数起作用的任何想法?
我自己想出来的
通过 运行 SQL 注册 UDF,如下所示:
bbTableEnv.executeSQL(
String.format("create temporary function $udf_name as '%s'", "$full_class_name_of_your_udf")
)
而不是
bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
- 更喜欢使用 Java 来实现 UDF 而不是 Scala
我想保留 Flink 中“Select .. From .. GROUP BY ..”查询中每个键的所有原始行。我定义了一个名为 RowToJsonAgg 的 AggregateFunction,它将行聚合成一个 Json 字符串。
class RowToJsonAgg extends AggregateFunction[String, ListBuffer[String]]{
def accumulate(accumulator: ListBuffer[String], row: Any*): Unit = {
....
// 假设该行看起来像“$field1_name, $field1_value, $field2_name, $field2_value, ...” // 尝试从行中生成 json。然而,当我 运行 查询时,Flink 似乎找不到这个函数 }
def merge(accumulator: ListBuffer[String], its: java.lang.Iterable[ListBuffer[String]]): Unit = {
accumulator.append(
WrapAsScala.iterableAsScalaIterable(its).flatten.toList:_*
)
}
def resetAccumulator(accumulator: ListBuffer[String]): Unit = {
accumulator.clear()
}
override def getValue(accumulator: ListBuffer[String]): String = {
accumulator.mkString("{", ",", "}")
}
override def createAccumulator(): ListBuffer[String] = ListBuffer.empty
override def getAccumulatorType(): TypeInformation[ListBuffer[String]] = {
TypeInformation.of(classOf[ListBuffer[String]])
}
override def getResultType: TypeInformation[String] = TypeInformation.of(classOf[String])
}
数据 class 和查询如下所示:
case class Stock(id:Int, price: Int, volumn: Int, ts: Long)
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
val table = bbTableEnv.fromValues(...)
bbTableEnv.createTemporaryView("Stock", table)
bbTableEnv.executeSql(
"select price, row_to_json_agg('volumn', volumn, 'ts', ts) as details from Stock group by price"
)
当我 运行 应用程序时,我得到 SQL 验证异常,详细消息是“找不到函数签名的匹配项 row_to_json_agg(字符、数字、字符、数字)"
Flink 似乎找不到合适的 accumulate 函数来调用。
如果我声明累加函数如下
def accumulate(accumulator: ListBuffer[String], volumn: Integer, ts: Long)
并将查询更改为
"select price, row_to_json_agg(volumn, ts) from Stock group by price"
我遇到了同样的异常,消息是“没有找到函数签名的匹配项row_to_json_agg(NUMERIC, NUMERIC)
关于如何使聚合函数起作用的任何想法?
我自己想出来的
通过 运行 SQL 注册 UDF,如下所示:
bbTableEnv.executeSQL( String.format("create temporary function $udf_name as '%s'", "$full_class_name_of_your_udf") )
而不是
bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
- 更喜欢使用 Java 来实现 UDF 而不是 Scala