Cassandra:如何在用户定义的函数中引用用户定义类型的字段 (Java)
Cassandra: How to reference a field in user defined type in a user defined function (Java)
在使用 Java 用户定义函数时,如何引用用户定义类型中的字段。我找到了映射、集合和元组的示例,但没有找到具有多个字段的用户定义类型的示例。
我定义了以下类型:
create type avg_type_1 (
accum tuple<text,int,double>, // source, count, sum
avg_map map<text,double> // source, average
);
以下代码:
CREATE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
CALLED ON NULL INPUT
RETURNS avg_type_1
LANGUAGE java
AS $$
// when no source yet, save the first source, set the count to 1, and set the value
if (state.accum.getString(0) == null) {
state.accum.setString(0, source);
state.accum.setInt(1, 1);
state.accum.setDouble(2, value);
}
...
returns错误:
InvalidRequest: Error from server: code=2200 [Invalid query] message="Java source compilation failed:
Line 4: accum cannot be resolved or is not a field
在 Java 中,UDT 变量由 class com.datastax.driver.core.UDTValue 表示。这个 class 有 get 和 set 方法。有使用索引 (0 ...) 来标识字段的方法(按照它们在 UDT 中定义的顺序),以及使用字段名称的方法。
见API Doc。
这里有一些例子,使用问题中定义的类型:
TupleValue accumState = state.getTupleValue( "accum");
String prevSource = accumState.getString( 0);
Map<String,Double> avgMap = state.getMap( "avg_map", String.class, Double.class);
第一行从函数的状态中获取 accum 字段。可以使用索引 0(零,它是第一个字段)代替名称。
第二行获取元组的第一个元素。只能使用索引版本,因为元组的元素没有命名。
第三行获取avg_map字段。
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
上面的例子设置元组中的第三个元素,然后将元组放回函数的状态变量中。请注意,您必须将元组放回状态变量中。以下无效。
// does not work
state.getTupleValue( "accum").setDouble( 2, value);
下面是完整的 UDF 示例。
// sums up until the source changes, then adds the avg to the map
// IMPORTANT: table must be ordered by source
CREATE OR REPLACE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
CALLED ON NULL INPUT
RETURNS avg_type_1
LANGUAGE java
AS $$
TupleValue accumState = state.getTupleValue( "accum");
String prevSource = accumState.getString( 0);
// when no source yet, save the first source, set the count to 1, and set the value
if (prevSource == null) {
accumState.setString( 0, source);
accumState.setInt( 1, 1);
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
}
// when same source, increment the count and add the value
else if (prevSource.equals( source)) {
accumState.setInt( 1, accumState.getInt( 1) + 1);
accumState.setDouble( 2, accumState.getDouble( 2) + value);
state.setTupleValue( "accum", accumState);
}
// when different source, calc average and copy to map, then re-init accumulation
else if (accumState.getInt( 1) > 0) {
double avgVal = accumState.getDouble( 2) / accumState.getInt( 1);
Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
mapState.put( prevSource, avgVal);
state.setMap( "avg_map", mapState, String.class, Double.class);
accumState.setString( 0, source);
accumState.setInt( 1, 1);
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
}
// should not happen - prev case uses "if" to avoid division by zero
else {
Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
mapState.put( "ERROR: div by zero", null);
accumState.setString( 0, source);
accumState.setInt( 1, 1);
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
}
// IMPROTANT: final function must calculate the average for the last source and
// add it to the map.
return state;
$$
;
在使用 Java 用户定义函数时,如何引用用户定义类型中的字段。我找到了映射、集合和元组的示例,但没有找到具有多个字段的用户定义类型的示例。
我定义了以下类型:
create type avg_type_1 (
accum tuple<text,int,double>, // source, count, sum
avg_map map<text,double> // source, average
);
以下代码:
CREATE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
CALLED ON NULL INPUT
RETURNS avg_type_1
LANGUAGE java
AS $$
// when no source yet, save the first source, set the count to 1, and set the value
if (state.accum.getString(0) == null) {
state.accum.setString(0, source);
state.accum.setInt(1, 1);
state.accum.setDouble(2, value);
}
...
returns错误:
InvalidRequest: Error from server: code=2200 [Invalid query] message="Java source compilation failed:
Line 4: accum cannot be resolved or is not a field
在 Java 中,UDT 变量由 class com.datastax.driver.core.UDTValue 表示。这个 class 有 get 和 set 方法。有使用索引 (0 ...) 来标识字段的方法(按照它们在 UDT 中定义的顺序),以及使用字段名称的方法。
见API Doc。
这里有一些例子,使用问题中定义的类型:
TupleValue accumState = state.getTupleValue( "accum");
String prevSource = accumState.getString( 0);
Map<String,Double> avgMap = state.getMap( "avg_map", String.class, Double.class);
第一行从函数的状态中获取 accum 字段。可以使用索引 0(零,它是第一个字段)代替名称。
第二行获取元组的第一个元素。只能使用索引版本,因为元组的元素没有命名。
第三行获取avg_map字段。
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
上面的例子设置元组中的第三个元素,然后将元组放回函数的状态变量中。请注意,您必须将元组放回状态变量中。以下无效。
// does not work
state.getTupleValue( "accum").setDouble( 2, value);
下面是完整的 UDF 示例。
// sums up until the source changes, then adds the avg to the map
// IMPORTANT: table must be ordered by source
CREATE OR REPLACE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
CALLED ON NULL INPUT
RETURNS avg_type_1
LANGUAGE java
AS $$
TupleValue accumState = state.getTupleValue( "accum");
String prevSource = accumState.getString( 0);
// when no source yet, save the first source, set the count to 1, and set the value
if (prevSource == null) {
accumState.setString( 0, source);
accumState.setInt( 1, 1);
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
}
// when same source, increment the count and add the value
else if (prevSource.equals( source)) {
accumState.setInt( 1, accumState.getInt( 1) + 1);
accumState.setDouble( 2, accumState.getDouble( 2) + value);
state.setTupleValue( "accum", accumState);
}
// when different source, calc average and copy to map, then re-init accumulation
else if (accumState.getInt( 1) > 0) {
double avgVal = accumState.getDouble( 2) / accumState.getInt( 1);
Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
mapState.put( prevSource, avgVal);
state.setMap( "avg_map", mapState, String.class, Double.class);
accumState.setString( 0, source);
accumState.setInt( 1, 1);
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
}
// should not happen - prev case uses "if" to avoid division by zero
else {
Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
mapState.put( "ERROR: div by zero", null);
accumState.setString( 0, source);
accumState.setInt( 1, 1);
accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);
}
// IMPROTANT: final function must calculate the average for the last source and
// add it to the map.
return state;
$$
;