使用映射和元组值实例化 Cassandra UDA 函数中的元组值(每日平均值)

Instantiate tuple value in Cassandra UDA function with map and tuple value (for daily average)

我正在尝试创建一个函数来按天对值进行计数和求和(以便稍后计算平均值)。我做到了这一点:

CREATE OR REPLACE FUNCTION state_group_count_and_sum( state map<timestamp, frozen<tuple<bigint,double>>>, timestamp timestamp, value double )
CALLED ON NULL INPUT
RETURNS map<timestamp, frozen<tuple<bigint,double>>>
LANGUAGE java AS '
Date date = (Date) timestamp;
Calendar cal = Calendar.getInstance(); // locale-specific
cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
date = cal.getTime();

TupleValue tupleValue = state.get(date);

Long count = (Long) tupleValue.getLong(0);
if (count == null) count = 1L;
else count = count + 1L;

Double sum = (Double) tupleValue.getDouble(1);
if (sum == null) sum = value;
else sum = sum + value;

//if (tupleValue == null) ?
tupleValue.setLong(0, count);
tupleValue.setDouble(1, sum);
state.put(date, tupleValue);
return state; ' ;

CREATE OR REPLACE AGGREGATE group_count_and_sum(timestamp, double) 
SFUNC state_group_count_and_sum 
STYPE map<timestamp, frozen<tuple<bigint,double>>>
INITCOND {};

这失败了,因为 tupleValue 在每个新的一天都为空,这还不在地图中。如何在 UDA 中实例化元组值?

已修复。

CREATE OR REPLACE FUNCTION state_group_count_and_sum( state map<timestamp, frozen<tuple<bigint,double>>>, timestamp timestamp, value double )
CALLED ON NULL INPUT
RETURNS map<timestamp, frozen<tuple<bigint,double>>>
LANGUAGE java AS '
Date date = (Date) timestamp;
Calendar cal = Calendar.getInstance(); // locale-specific
cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
date = cal.getTime();

TupleValue tupleValue = state.get(date);
if (tupleValue == null) {
com.datastax.driver.core.TupleType tupleType = com.datastax.driver.core.TupleType.of(com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED, com.datastax.driver.core.CodecRegistry.DEFAULT_INSTANCE, com.datastax.driver.core.DataType.bigint(), com.datastax.driver.core.DataType.cdouble());
tupleValue = tupleType.newValue(0L, 0.0);
}

Long count = (Long) tupleValue.getLong(0);
if (count == null) count = 1L;
else count = count + 1L;

Double sum = (Double) tupleValue.getDouble(1);
if (sum == null) sum = value;
else sum = sum + value;

tupleValue.setLong(0, count);
tupleValue.setDouble(1, sum);
state.put(date, tupleValue);
return state; ' ;

CREATE OR REPLACE AGGREGATE group_count_and_sum(timestamp, double) 
SFUNC state_group_count_and_sum 
STYPE map<timestamp, frozen<tuple<bigint,double>>>
INITCOND {};