从 Scala 代码调用 java 泛型 Java 方法时类型不匹配
Type mismatch when call java generic Java method from Scala code
我有 n 个 Java 类 和一个超类 - 数据模型。
类 的列表是我要在其中创建的 Scala 方法的输入参数
resultStreams 并且必须从 process 方法创建调用 Java 通用方法。
能写下怎么解决吗?
我尝试在方法调用中使用 [_ <: SpecificRecordBase], [SpecificRecordBase],但结果相同。
错误
Error:(146, 88) type mismatch;
found : Class[_] where type _ <: org.apache.avro.specific.SpecificRecordBase
required: Class[org.apache.avro.specific.SpecificRecordBase]
Note: _ <: org.apache.avro.specific.SpecificRecordBase, but Java-defined class Class is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: org.apache.avro.specific.SpecificRecordBase`. (SLS 3.2.10)
AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))
Scala 代码
object GenerickRunnerWhosebug {
def process(inputClasses : List[Class[_ <: SpecificRecordBase]],): Unit = {
val newStream: DataStream[KafkaSourceType] = env.addSource(....)).uid(...).filter(...)
val resultStreams = inputClasses .map(
cl => newStream.map(record =>
AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))
...
}
def main(args: Array[String]): Unit = {
val topicToClasses: List[Class[_ <: SpecificRecordBase]] = List(Types.RECORD_1.getClassType, Types.RECORD_1.getClassType.getClassType)
process(topicToClasses)
}
}
Java 方法规范
public static <A extends SpecificRecord> A deSerializeAvroObject(byte[] object, Class<A> clazz){ ...}
型号
public class Record1 extends SpecificRecordBase {}
public class Record2 extends SpecificRecordBase {}
...
public enum Types {
RECORD_1(Record1.class),
RECORD_2(Record2.class);
....
private Class<? extends SpecificRecordBase> clazz;
public Class<? extends SpecificRecordBase> getClassType() {return this.clazz;}
}
我也有与 Scala addSink 方法相同的消息错误:
def addSink(sinkFunction : org.apache.flink.streaming.api.functions.sink.SinkFunction[T]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ }
我写包装方法:
def addSinkWithSpecificRecordBase[A <: SpecificRecordBase](
stream: DataStream[A],
sink: BucketingSink[A]): DataStreamSink[A] = stream.addSink(sink)
执行结果:
val result = topicToSinkStream.foreach { el =>
val stream: DataStream[_ <: SpecificRecordBase] = el._2._1
val sink: BucketingSink[_ <: SpecificRecordBase] = el._2._2
addSinkWithSpecificRecordBase(stream, sink)
}
出现错误:
Error:(209, 37) type mismatch;
found : org.apache.flink.streaming.api.scala.DataStream[_] where type _ <: org.apache.avro.specific.SpecificRecordBase
required: org.apache.flink.streaming.api.scala.DataStream[org.apache.avro.specific.SpecificRecordBase]
Note: _ <: org.apache.avro.specific.SpecificRecordBase, but class DataStream is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
addSinkWithSpecificRecordBase(stream, sink)
topicToSinkStream 所在位置:
Map[String, (DataStream[_ <: SpecificRecordBase], BucketingSink[_ <: SpecificRecordBase])]
我也尝试去掉方法通用描述中的SpecificRecordBase,并在方法参数描述中添加+和-。但是没有结果。
问题是 AvroHelper.deSerializeAvroObject(record.value, cl)
的类型是 SpecificRecordBase
(_ <: SpecificRecordBase
只允许在类型 参数 中使用,此处不允许)。解决方法是提取辅助函数:
def processClass[A <: SpecificRecordBase](cl: Class[A], newStream: DataStream[KafkaSourceType]) =
newStream.map(record => AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))
(如果您在本地定义它,您也可以使用 newStream
而无需将其作为参数)然后
val resultStreams = inputClasses.map(cl => processClass(cl, newStream))
我有 n 个 Java 类 和一个超类 - 数据模型。 类 的列表是我要在其中创建的 Scala 方法的输入参数 resultStreams 并且必须从 process 方法创建调用 Java 通用方法。 能写下怎么解决吗? 我尝试在方法调用中使用 [_ <: SpecificRecordBase], [SpecificRecordBase],但结果相同。
错误
Error:(146, 88) type mismatch;
found : Class[_] where type _ <: org.apache.avro.specific.SpecificRecordBase
required: Class[org.apache.avro.specific.SpecificRecordBase]
Note: _ <: org.apache.avro.specific.SpecificRecordBase, but Java-defined class Class is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: org.apache.avro.specific.SpecificRecordBase`. (SLS 3.2.10)
AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))
Scala 代码
object GenerickRunnerWhosebug {
def process(inputClasses : List[Class[_ <: SpecificRecordBase]],): Unit = {
val newStream: DataStream[KafkaSourceType] = env.addSource(....)).uid(...).filter(...)
val resultStreams = inputClasses .map(
cl => newStream.map(record =>
AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))
...
}
def main(args: Array[String]): Unit = {
val topicToClasses: List[Class[_ <: SpecificRecordBase]] = List(Types.RECORD_1.getClassType, Types.RECORD_1.getClassType.getClassType)
process(topicToClasses)
}
}
Java 方法规范
public static <A extends SpecificRecord> A deSerializeAvroObject(byte[] object, Class<A> clazz){ ...}
型号
public class Record1 extends SpecificRecordBase {}
public class Record2 extends SpecificRecordBase {}
...
public enum Types {
RECORD_1(Record1.class),
RECORD_2(Record2.class);
....
private Class<? extends SpecificRecordBase> clazz;
public Class<? extends SpecificRecordBase> getClassType() {return this.clazz;}
}
我也有与 Scala addSink 方法相同的消息错误:
def addSink(sinkFunction : org.apache.flink.streaming.api.functions.sink.SinkFunction[T]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ }
我写包装方法:
def addSinkWithSpecificRecordBase[A <: SpecificRecordBase](
stream: DataStream[A],
sink: BucketingSink[A]): DataStreamSink[A] = stream.addSink(sink)
执行结果:
val result = topicToSinkStream.foreach { el =>
val stream: DataStream[_ <: SpecificRecordBase] = el._2._1
val sink: BucketingSink[_ <: SpecificRecordBase] = el._2._2
addSinkWithSpecificRecordBase(stream, sink)
}
出现错误:
Error:(209, 37) type mismatch;
found : org.apache.flink.streaming.api.scala.DataStream[_] where type _ <: org.apache.avro.specific.SpecificRecordBase
required: org.apache.flink.streaming.api.scala.DataStream[org.apache.avro.specific.SpecificRecordBase]
Note: _ <: org.apache.avro.specific.SpecificRecordBase, but class DataStream is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
addSinkWithSpecificRecordBase(stream, sink)
topicToSinkStream 所在位置:
Map[String, (DataStream[_ <: SpecificRecordBase], BucketingSink[_ <: SpecificRecordBase])]
我也尝试去掉方法通用描述中的SpecificRecordBase,并在方法参数描述中添加+和-。但是没有结果。
问题是 AvroHelper.deSerializeAvroObject(record.value, cl)
的类型是 SpecificRecordBase
(_ <: SpecificRecordBase
只允许在类型 参数 中使用,此处不允许)。解决方法是提取辅助函数:
def processClass[A <: SpecificRecordBase](cl: Class[A], newStream: DataStream[KafkaSourceType]) =
newStream.map(record => AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))
(如果您在本地定义它,您也可以使用 newStream
而无需将其作为参数)然后
val resultStreams = inputClasses.map(cl => processClass(cl, newStream))