Spring Cloud Stream Kafka with Confluent 生成的消息与 Spring Kafka with Confluent 生成的消息不同

Spring Cloud Stream Kafka with Confluent is not producing same message as Spring Kafka with Confluent

我想将 Spring Cloud Stream Kafka 用于我的 Java/Spring 服务,我需要生成 Confluent 序列化消息,因为我有 .NET 和 NodeJS 客户端使用 Confluent API 来使用我的消息.

据我们所知,Spring 带有 Confluent 序列化器的 Kafka 正在为我们工作,而 Spring 带有 Confluent 序列化器的 Cloud Stream Kafka 给我们带来了问题。

为了展示我在这 2 种情况下可以看到的差异,我在 GitHub 上创建了 2 个示例存储库,其中仅包含在两种情况下生成简单消息所需的代码。

  1. With Spring Kakfa 和 Confluent https://github.com/donalthurley/springKafkaAvro

  2. 使用 Spring Cloud Stream Kafka 和 Confluent https://github.com/donalthurley/springCloudKafkaAvro

我想我已经为 Spring 云应用程序正确配置了带有 useNativeEncoding 标志的配置设置和 confluent 序列化器配置,这些可以在此处的源代码中看到 https://github.com/donalthurley/springCloudKafkaAvro/blob/master/src/main/resources/application.yaml#L8

      kafka:
        binder:
          useNativeEncoding: true
          brokers: 127.0.0.1:9092
        bindings:
          output:
            producer:
              configuration:
                schema.registry.url: http://127.0.0.1:8081
                key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

我从 Spring Kafka 应用程序和 Spring Cloud Stream Kafka 应用程序发送相同的简单消息,日志显示。

Producing Kafka person event: {"lastName": "Doe", "firstName": "John"}

当我在 docker Kafka 环境中使用 Kafka Topics UI 浏览器时,请参阅 https://hub.docker.com/r/landoop/fast-data-dev/,并查看消息原始数据,这两种情况都不同。

对于 Spring Kafka 来说看起来更正确,因为浏览器会识别并显示消息值中的字段。

[
  {
    "topic": "test_spring_kafka",
    "key": "3197449393600061094",
    "value": {
      "lastName": "Doe",
      "firstName": "John"
    },
    "partition": 0,
    "offset": 0
  }
]

在Spring Cloud Stream Kafka原始数据中,浏览器无法识别显示消息不相同的字段。

[
  {
    "topic": "test_spring_cloud_kafka",
    "key": "-6214497758709596999",
    "value": "\u0006Doe\bJohn",
    "partition": 0,
    "offset": 0
  }
]

我认为使用 Spring Cloud Stream Kafka 生成 Confluent 消息可能存在问题,并且 Spring Kafka 实现正确地生成了它们,但也许我在实现中遗漏了一些东西和一些有人可以帮我解决这个问题吗?

问题出在您配置 useNativeEncoding 的方式上。它没有生效。此配置应该有效:

spring:
  application:
    name: springCloudKafkaAvro
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://127.0.0.1:8081
      kafka:
        binder:
            brokers: 127.0.0.1:9092
        bindings:
          output:
            producer:
              configuration:
                schema.registry.url: http://127.0.0.1:8081
                key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      bindings:
        output:
          destination: test_spring_cloud_kafka
          producer:
            useNativeEncoding: true

请注意 useNativeEncoding 是如何从您的原始配置中重新排列的。