Cassandra:如何在用户定义的函数中引用用户定义类型的字段 (Java)

Cassandra: How to reference a field in user defined type in a user defined function (Java)

在使用 Java 用户定义函数时,如何引用用户定义类型中的字段。我找到了映射、集合和元组的示例,但没有找到具有多个字段的用户定义类型的示例。

我定义了以下类型:

create type avg_type_1 (
  accum tuple<text,int,double>,   // source, count, sum
  avg_map map<text,double>        // source, average
);

以下代码:

CREATE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
    CALLED ON NULL INPUT
    RETURNS avg_type_1
    LANGUAGE java
    AS $$

        // when no source yet, save the first source, set the count to 1, and set the value
        if (state.accum.getString(0) == null) {
            state.accum.setString(0, source);
            state.accum.setInt(1, 1);
            state.accum.setDouble(2, value);
        }
        ...

returns错误:

InvalidRequest: Error from server: code=2200 [Invalid query] message="Java source compilation failed:
Line 4: accum cannot be resolved or is not a field

在 Java 中,UDT 变量由 class com.datastax.driver.core.UDTValue 表示。这个 class 有 get 和 set 方法。有使用索引 (0 ...) 来标识字段的方法(按照它们在 UDT 中定义的顺序),以及使用字段名称的方法。

API Doc

这里有一些例子,使用问题中定义的类型:

TupleValue accumState = state.getTupleValue( "accum");
String prevSource = accumState.getString( 0);
Map<String,Double> avgMap = state.getMap( "avg_map", String.class, Double.class);

第一行从函数的状态中获取 accum 字段。可以使用索引 0(零,它是第一个字段)代替名称。

第二行获取元组的第一个元素。只能使用索引版本,因为元组的元素没有命名。

第三行获取avg_map字段。

accumState.setDouble( 2, value);
state.setTupleValue( "accum", accumState);

上面的例子设置元组中的第三个元素,然后将元组放回函数的状态变量中。请注意,您必须将元组放回状态变量中。以下无效。

// does not work
state.getTupleValue( "accum").setDouble( 2, value);

下面是完整的 UDF 示例。

// sums up until the source changes, then adds the avg to the map
// IMPORTANT: table must be ordered by source
CREATE OR REPLACE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
    CALLED ON NULL INPUT
    RETURNS avg_type_1
    LANGUAGE java
    AS $$

        TupleValue accumState = state.getTupleValue( "accum");
        String prevSource = accumState.getString( 0);

        // when no source yet, save the first source, set the count to 1, and set the value
        if (prevSource == null) {
            accumState.setString( 0, source);
            accumState.setInt( 1, 1);
            accumState.setDouble( 2, value);
            state.setTupleValue( "accum", accumState);
        }

        // when same source, increment the count and add the value
        else if (prevSource.equals( source)) {
            accumState.setInt( 1, accumState.getInt( 1) + 1);
            accumState.setDouble( 2, accumState.getDouble( 2) + value);
            state.setTupleValue( "accum", accumState);
        }

        // when different source, calc average and copy to map, then re-init accumulation
        else if (accumState.getInt( 1) > 0) {
            double avgVal = accumState.getDouble( 2) / accumState.getInt( 1);
            Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
            mapState.put( prevSource, avgVal);
            state.setMap( "avg_map", mapState, String.class, Double.class);
            accumState.setString( 0, source);
            accumState.setInt( 1, 1);
            accumState.setDouble( 2, value);
            state.setTupleValue( "accum", accumState);
        }

        // should not happen - prev case uses "if" to avoid division by zero
        else {
            Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
            mapState.put( "ERROR: div by zero", null);
            accumState.setString( 0, source);
            accumState.setInt( 1, 1);
            accumState.setDouble( 2, value);
            state.setTupleValue( "accum", accumState);
        }

        // IMPROTANT: final function must calculate the average for the last source and
        //            add it to the map.

        return state;

    $$
;