如何使用 ValueMapper 使用 Scala 更改 Kafka Streams 10.2 中的值类型
How to use ValueMapper to change value type in Kafka Streams 10.2 using Scala
我正在尝试将基于 Scala 的 Kafka Streams 应用程序从 0.10.0.0 升级到 0.10.2.1,但我不知道如何编译该应用程序。
我在 documentation uses mapValue
but it does not change the values type. I am using Scala 2.11 with the -Xexperimental
compiler flag as per this.
中找到的示例
代码
class MyStream() {
def startMyStream(): Unit = {
val kStreamBuilder = new KStreamBuilder
val kStream = kStreamBuilder.stream("myTopic")
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
override def apply(value: Any) = 6.3
})
val kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties)
kafkaStreams.start()
}
}
编译器错误
no type parameters for method mapValues: (x: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: VR])org.apache.kafka.streams.kstream.KStream[Nothing,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
required: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: ?VR]
Note: AnyRef <: Any, but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
^
type mismatch;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
required: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: VR]
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
^
two errors found
表示为 java class 这编译正常。
public class MyStream {
public void startMyStream() {
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream kStream = kStreamBuilder.stream("myTopic");
kStream.mapValues(new ValueMapper<Object, Double>() {
@Override
public Double apply(Object value) {
return 6.3;
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties());
kafkaStreams.start();
}
}
如何获取要编译的 Scala 版本?
解决方案基于此
三种方法有效,两种方法无效。
class MyStream() {
def startMyStream(): Unit = {
val kStreamBuilder = new KStreamBuilder
// Explicit tying here is not required to compile and run.
val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")
// Does not compile
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
override def apply(value: AnyRef) = 6.3
})
// Does not compile
kStream.mapValues(_ => 6.3)
// Works
kStream.mapValues[Double](new ValueMapper[AnyRef, Double]() {
override def apply(value: AnyRef) = 6.3
})
// Works, requires compiler option -Xexperimental
kStream.mapValues[Double](_ => 6.3)
// Works, requires compiler option -Xexperimental
kStream.mapValues[Double](convert)
def convert(string: String): Double = 6.3
val kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties)
kafkaStreams.start()
}
}
更新:无效的解决方案
尝试 1 按照建议将显式类型添加到 kStream (val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")
) 仍然无法编译并导致此错误。
no type parameters for method mapValues: (x: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: ?VR]
Note: AnyRef <: Any, but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
^
type mismatch;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR]
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
尝试 2 添加上述内容并使用 SAM 转换到 "to avoid explicitly writing out an instantiation of an anonymous class" (kStream.mapValues(_ => 6.3)
) 导致此编译器错误。
no type parameters for method mapValues: (x: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable)
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: ?VR]
Note: String <: Any (and org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable <: org.apache.kafka.streams.kstream.ValueMapper[String,Double]), but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(_ => 6.3)
^
type mismatch;
found : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR]
kStream.mapValues(_ => 6.3)
^
这里有两个问题。首先,你没有指定流的类型:
val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")
这实际上是您看到的错误的原因 - 因为您没有指定类型,Scala 将它们推断为一些默认值,这几乎总是不是您想要的。
其次,由于您启用了 -Xexperimental
,您可以依靠 SAM 转换来避免显式写出匿名 class:
的实例化
kStream.mapValues(_ => 6.3)
更新: 似乎出于某种原因,Scala 编译器无法在此处正确推断匿名 function/SAM 实例的输出类型。通过以下小调整,我能够成功编译代码:
kStream.mapValues[Double](_ => 6.3)
我正在尝试将基于 Scala 的 Kafka Streams 应用程序从 0.10.0.0 升级到 0.10.2.1,但我不知道如何编译该应用程序。
我在 documentation uses mapValue
but it does not change the values type. I am using Scala 2.11 with the -Xexperimental
compiler flag as per this.
代码
class MyStream() {
def startMyStream(): Unit = {
val kStreamBuilder = new KStreamBuilder
val kStream = kStreamBuilder.stream("myTopic")
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
override def apply(value: Any) = 6.3
})
val kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties)
kafkaStreams.start()
}
}
编译器错误
no type parameters for method mapValues: (x: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: VR])org.apache.kafka.streams.kstream.KStream[Nothing,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
required: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: ?VR]
Note: AnyRef <: Any, but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
^
type mismatch;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
required: org.apache.kafka.streams.kstream.ValueMapper[_, _ <: VR]
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
^
two errors found
表示为 java class 这编译正常。
public class MyStream {
public void startMyStream() {
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream kStream = kStreamBuilder.stream("myTopic");
kStream.mapValues(new ValueMapper<Object, Double>() {
@Override
public Double apply(Object value) {
return 6.3;
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties());
kafkaStreams.start();
}
}
如何获取要编译的 Scala 版本?
解决方案基于此
三种方法有效,两种方法无效。
class MyStream() {
def startMyStream(): Unit = {
val kStreamBuilder = new KStreamBuilder
// Explicit tying here is not required to compile and run.
val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")
// Does not compile
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
override def apply(value: AnyRef) = 6.3
})
// Does not compile
kStream.mapValues(_ => 6.3)
// Works
kStream.mapValues[Double](new ValueMapper[AnyRef, Double]() {
override def apply(value: AnyRef) = 6.3
})
// Works, requires compiler option -Xexperimental
kStream.mapValues[Double](_ => 6.3)
// Works, requires compiler option -Xexperimental
kStream.mapValues[Double](convert)
def convert(string: String): Double = 6.3
val kafkaStreams = new KafkaStreams(kStreamBuilder, new Properties)
kafkaStreams.start()
}
}
更新:无效的解决方案
尝试 1 按照建议将显式类型添加到 kStream (val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")
) 仍然无法编译并导致此错误。
no type parameters for method mapValues: (x: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: ?VR]
Note: AnyRef <: Any, but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
^
type mismatch;
found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR]
kStream.mapValues(new ValueMapper[AnyRef, Double]() {
尝试 2 添加上述内容并使用 SAM 转换到 "to avoid explicitly writing out an instantiation of an anonymous class" (kStream.mapValues(_ => 6.3)
) 导致此编译器错误。
no type parameters for method mapValues: (x: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable)
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: ?VR]
Note: String <: Any (and org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable <: org.apache.kafka.streams.kstream.ValueMapper[String,Double]), but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(_ => 6.3)
^
type mismatch;
found : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String, _ <: VR]
kStream.mapValues(_ => 6.3)
^
这里有两个问题。首先,你没有指定流的类型:
val kStream: KStream[Array[Byte], String] = kStreamBuilder.stream("myTopic")
这实际上是您看到的错误的原因 - 因为您没有指定类型,Scala 将它们推断为一些默认值,这几乎总是不是您想要的。
其次,由于您启用了 -Xexperimental
,您可以依靠 SAM 转换来避免显式写出匿名 class:
kStream.mapValues(_ => 6.3)
更新: 似乎出于某种原因,Scala 编译器无法在此处正确推断匿名 function/SAM 实例的输出类型。通过以下小调整,我能够成功编译代码:
kStream.mapValues[Double](_ => 6.3)