Why do I get this compilation error: "could not find implicit value for kstream.Consumed" and how could I fix it?
Why do I get this compilation error: "could not find implicit value for kstream.Consumed" and how could I fix it?
我们有这些依赖关系:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
我们使用代码生成器从 AVRO 模式文件中生成 Scala 案例 classes。一个这样生成的案例 class 具有一个 Either 值作为其字段之一。在 AVRO 模式中,这是用 type=[t1,t2] 表示的,所以这一代似乎是体面的,这是一个总和类型:可以是类型 t1 或类型 t2。
问题变成了从主题到案例 class(二进制 -> Avro 映射 -> 案例 class)的反序列化路径中缺少什么。
基本上我目前收到这个错误:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")
第一个想到的是kafka-streams-avro-serde,但可能是这个库只确保了AVRO Map的Serde[GenericRecord],而不是case classes。因此,其他依赖项之一是帮助 AVRO GenericRecord 到 case classes 映射并返回。我们还有一些手写代码可以从模式中生成案例 classes,这似乎可以直接与 spray json.
一起使用
我在想,在 (binary <-> Avro GenericRecord <-> case class instance) 转换中,存在差距,这可能是 class 有一个 Either 字段吗?
我现在正在尝试创建一个 Serde[UserEvent] 实例。因此,根据我的理解,将涉及在 UserEvent 和 AVRO GenericRecord 之间进行转换,类似于 Map,然后在 AVRO Record 和二进制文件之间进行转换——这可能被 kafka-streams-avro-serde 依赖项所涵盖,就像应该有一个 Serde[GenericRecord]或类似的。
导入明智,我们有这个来导入隐式:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
是否导入了对应的包?
import org.apache.kafka.streams.scala.ImplicitConversions._
比照。 https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-dsl
实际上缺少导入。
现在可以编译了。
以下是进口商品:
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
对我来说,我必须更好地遵循 the directions,并添加一个隐式 serde 实现。他们在 link 中的示例如下所示:
// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
有关更完整的示例,请参阅 scala tests for their avro lib:
// Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
implicit val genericAvroSerde: Serde[GenericRecord] = {
val gas = new GenericAvroSerde
val isKeySerde: Boolean = false
gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
gas
}
我们有这些依赖关系:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
我们使用代码生成器从 AVRO 模式文件中生成 Scala 案例 classes。一个这样生成的案例 class 具有一个 Either 值作为其字段之一。在 AVRO 模式中,这是用 type=[t1,t2] 表示的,所以这一代似乎是体面的,这是一个总和类型:可以是类型 t1 或类型 t2。
问题变成了从主题到案例 class(二进制 -> Avro 映射 -> 案例 class)的反序列化路径中缺少什么。
基本上我目前收到这个错误:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")
第一个想到的是kafka-streams-avro-serde,但可能是这个库只确保了AVRO Map的Serde[GenericRecord],而不是case classes。因此,其他依赖项之一是帮助 AVRO GenericRecord 到 case classes 映射并返回。我们还有一些手写代码可以从模式中生成案例 classes,这似乎可以直接与 spray json.
一起使用我在想,在 (binary <-> Avro GenericRecord <-> case class instance) 转换中,存在差距,这可能是 class 有一个 Either 字段吗?
我现在正在尝试创建一个 Serde[UserEvent] 实例。因此,根据我的理解,将涉及在 UserEvent 和 AVRO GenericRecord 之间进行转换,类似于 Map,然后在 AVRO Record 和二进制文件之间进行转换——这可能被 kafka-streams-avro-serde 依赖项所涵盖,就像应该有一个 Serde[GenericRecord]或类似的。
导入明智,我们有这个来导入隐式:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
是否导入了对应的包?
import org.apache.kafka.streams.scala.ImplicitConversions._
比照。 https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-dsl
实际上缺少导入。 现在可以编译了。 以下是进口商品:
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
对我来说,我必须更好地遵循 the directions,并添加一个隐式 serde 实现。他们在 link 中的示例如下所示:
// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
有关更完整的示例,请参阅 scala tests for their avro lib:
// Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
implicit val genericAvroSerde: Serde[GenericRecord] = {
val gas = new GenericAvroSerde
val isKeySerde: Boolean = false
gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
gas
}