在没有 Confluent 组件的情况下从 Kafka 生成和使用 Avro 消息

Producing and Consuming Avro messages from Kafka without Confluent components

我正在尝试找到一个示例,我可以在其中生成和订阅来自 kafka 的 avro 消息。

此时,我想使用 "vanilla" kafka 部署,没有任何 confluent 附加组件。

这可能吗?到目前为止,我发现的所有示例都很快开始使用 avro 消息的融合特定工具。

我确信应该有一种方法可以让我在没有任何插件的情况下仅在 kafka 平台上发布和使用 avro 消息'distribution specific'。

当然,您可以在没有任何 Confluent 工具的情况下做到这一点。但是你必须在你这边做额外的工作(例如在你的应用程序代码中)——这是提供 Avro 相关工具的最初动机,比如你提到的来自 Confluent 的工具。

一种选择是直接使用 Apache Avro Java API 手动 serialize/deserialize Kafka 消息的负载(例如从 YourJavaPojobyte[])。 (我想你暗示 Java 作为选择的编程语言。)这会是什么样子?这是一个例子。

  • 首先,您将在将数据写入 Kafka 的应用程序中手动序列化数据负载。在这里,您可以使用 Avro 序列化 API 对有效载荷进行编码(从 Java pojo 到 byte[]),然后使用 Kafka 的 Java 生产者客户端来写入编码后的有效载荷到 Kafka 主题。
  • 然后,在数据管道的下游,您将在另一个从 Kafka 读取数据的应用程序中反序列化。在这里,您可以使用 Kafka 的 Java 消费者客户端从同一 Kafka 主题读取(编码的)数据,并使用 Avro 反序列化 API 再次解码有效负载(来自 byte[] 到 Java pojo)。

您也可以直接使用 Avro API,当然,在使用 Kafka Streams(将包含在即将推出的 Apache Kafka 0.10 中)或 Apache Storm 等流处理工具时。

最后,您还可以选择使用一些实用程序库(无论是来自 Confluent 还是其他地方),这样您就不必直接使用 Apache Avro API。值得一提的是,我在 kafka-storm-starter, e.g. as demonstrated by AvroDecoderBolt.scala. Here, the Avro serialization/deserialization is done by using the Scala library Twitter Bijection 上发布了一些稍微复杂一些的示例。以下是 AvroDecoderBolt.scala 的示例片段,可为您提供总体思路:

  // This tells Bijection how to automagically deserialize a Java type `T`,
  // given a byte array `byte[]`.
  implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]

  // Let's put Bijection to use.
  private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
    require(bytes != null, "bytes must not be null")
    val decodeTry = Injection.invert(bytes)  // <-- deserialization, using Twitter Bijection, happens here
    decodeTry match {
      case Success(pojo) =>
        log.debug("Binary data decoded into pojo: " + pojo)
        collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
        ()
      case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
    }
  }

所以是的,您当然可以选择不使用任何其他库,例如 Confluent 的 Avro serializers/deserializers(目前作为 confluentinc/schema-registry) or Twitter's Bijection 的一部分提供)。是否值得额外的努力取决于你来决定。