卡夫卡流中的自定义时间戳提取器
custom timestamp extractor in kafka streams
我正在尝试使用 kafka 流来处理来自 kafka 主题的一些数据。数据来自 kafka 0.11.0 写入的 kafka 主题,其中没有嵌入时间戳。在互联网上阅读一些内容后,我开始明白我可以通过在自定义 class 中扩展 TimestampExtractor
class 并将其传递到 StreamsConfig
来解决这个问题。
我是这样做的--
class MyEventTimestampExtractor extends TimestampExtractor {
override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long) = {
record.value() match {
case w: String => 1000L
case _ => throw new RuntimeException(s"Called for $record")
}
}
}
但是,当我执行 sbt run
时出现此错误
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:34: class MyEventTimestampExtractor needs to be abstract, since method extract in trait TimestampExtractor of type (x: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object], x: Long)Long is not defined
[error] (Note that Long does not match Long)
[error] class MyEventTimestampExtractor extends TimestampExtractor {
[error] ^
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:35: method extract overrides nothing.
[error] Note: the super classes of class MyEventTimestampExtractor contain the following, non final members named extract:
[error] def extract(x: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object],x: Long): Long
[error] override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long): Long = {
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
我的build.sbt文件是这个--
name := "kafka streams experiment"
version := "1.0"
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "1.0.0"
)
我不太明白这个错误。特别是 Note that Long does not match Long
周围的部分。我做错了什么?谢谢!
尝试使用(观察 prev
函数参数的类型:
override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: java.lang.Long) = {
您需要 java.Long,因为此 API 是在 Java 中定义的,而您使用的是 Scala Long
我正在尝试使用 kafka 流来处理来自 kafka 主题的一些数据。数据来自 kafka 0.11.0 写入的 kafka 主题,其中没有嵌入时间戳。在互联网上阅读一些内容后,我开始明白我可以通过在自定义 class 中扩展 TimestampExtractor
class 并将其传递到 StreamsConfig
来解决这个问题。
我是这样做的--
class MyEventTimestampExtractor extends TimestampExtractor {
override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long) = {
record.value() match {
case w: String => 1000L
case _ => throw new RuntimeException(s"Called for $record")
}
}
}
但是,当我执行 sbt run
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:34: class MyEventTimestampExtractor needs to be abstract, since method extract in trait TimestampExtractor of type (x: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object], x: Long)Long is not defined
[error] (Note that Long does not match Long)
[error] class MyEventTimestampExtractor extends TimestampExtractor {
[error] ^
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:35: method extract overrides nothing.
[error] Note: the super classes of class MyEventTimestampExtractor contain the following, non final members named extract:
[error] def extract(x: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object],x: Long): Long
[error] override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long): Long = {
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
我的build.sbt文件是这个--
name := "kafka streams experiment"
version := "1.0"
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "1.0.0"
)
我不太明白这个错误。特别是 Note that Long does not match Long
周围的部分。我做错了什么?谢谢!
尝试使用(观察 prev
函数参数的类型:
override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: java.lang.Long) = {
您需要 java.Long,因为此 API 是在 Java 中定义的,而您使用的是 Scala Long