获取未指定的值参数scala Kafka
Getting unspecified value parameter scala Kafka
您好,我遇到了这个编译错误 -
Unspecified Value parameters: aggregator: (String, String, NotInferedVR) => NotInferedVR
但我显然已经有了一个聚合器。有谁知道怎么回事吗?
val stream = builder
.stream(inputTopic)(Consumed.`with`(Serdes.String(), Serdes.ByteArray()))
.map{ (key: String, value: Array[Byte]) =>
println(s"key = ${key}")
val lv = GroupByAction.convertByteArrayToJsonObject(value)
val lst = List.empty[String]
val newKey = GroupByAction
.reKey(lv
, groupByColumnList
.asScala
.toList
,lst)
val newValue = getValFromJSONMessage(lv, aggregateColumnList.asScala.toList.head)
println(s"newKey = ${newKey}")
(newKey, newValue)}
.groupByKey(Serialized.`with`(Serdes.String(), Serdes.String()))
.aggregate{ () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString }
您只能使用 {}
形式的方法调用来传递单个参数,该参数被视为一个块。您需要传递两个参数,因此请改用 ()
:
aggregate( () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString )
您好,我遇到了这个编译错误 -
Unspecified Value parameters: aggregator: (String, String, NotInferedVR) => NotInferedVR
但我显然已经有了一个聚合器。有谁知道怎么回事吗?
val stream = builder
.stream(inputTopic)(Consumed.`with`(Serdes.String(), Serdes.ByteArray()))
.map{ (key: String, value: Array[Byte]) =>
println(s"key = ${key}")
val lv = GroupByAction.convertByteArrayToJsonObject(value)
val lst = List.empty[String]
val newKey = GroupByAction
.reKey(lv
, groupByColumnList
.asScala
.toList
,lst)
val newValue = getValFromJSONMessage(lv, aggregateColumnList.asScala.toList.head)
println(s"newKey = ${newKey}")
(newKey, newValue)}
.groupByKey(Serialized.`with`(Serdes.String(), Serdes.String()))
.aggregate{ () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString }
您只能使用 {}
形式的方法调用来传递单个参数,该参数被视为一个块。您需要传递两个参数,因此请改用 ()
:
aggregate( () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString )