Why do I get this compilation error: "could not find implicit value for kstream.Consumed" and how could I fix it?

Why do I get this compilation error: "could not find implicit value for kstream.Consumed" and how could I fix it?

我们有这些依赖关系:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"

我们使用代码生成器从 AVRO 模式文件中生成 Scala 案例 classes。一个这样生成的案例 class 具有一个 Either 值作为其字段之一。在 AVRO 模式中,这是用 type=[t1,t2] 表示的,所以这一代似乎是体面的,这是一个总和类型:可以是类型 t1 或类型 t2。

问题变成了从主题到案例 class(二进制 -> Avro 映射 -> 案例 class)的反序列化路径中缺少什么。

基本上我目前收到这个错误:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")

第一个想到的是kafka-streams-avro-serde,但可能是这个库只确保了AVRO Map的Serde[GenericRecord],而不是case classes。因此,其他依赖项之一是帮助 AVRO GenericRecord 到 case classes 映射并返回。我们还有一些手写代码可以从模式中生成案例 classes,这似乎可以直接与 spray json.

一起使用

我在想,在 (binary <-> Avro GenericRecord <-> case class instance) 转换中,存在差距,这可能是 class 有一个 Either 字段吗?

我现在正在尝试创建一个 Serde[UserEvent] 实例。因此,根据我的理解,将涉及在 UserEvent 和 AVRO GenericRecord 之间进行转换,类似于 Map,然后在 AVRO Record 和二进制文件之间进行转换——这可能被 kafka-streams-avro-serde 依赖项所涵盖,就像应该有一个 Serde[GenericRecord]或类似的。

导入明智,我们有这个来导入隐式:


import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed

是否导入了对应的包?

import org.apache.kafka.streams.scala.ImplicitConversions._

比照。 https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-dsl

实际上缺少导入。 现在可以编译了。 以下是进口商品:

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._

对我来说,我必须更好地遵循 the directions,并添加一个隐式 serde 实现。他们在 link 中的示例如下所示:

// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde

有关更完整的示例,请参阅 scala tests for their avro lib:

    // Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
    implicit val genericAvroSerde: Serde[GenericRecord] = {
      val gas = new GenericAvroSerde
      val isKeySerde: Boolean = false
      gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
      gas
    }