org.apache.kafka.common.errors.SerializationException:未知的魔法字节

org.apache.kafka.common.errors.SerializationException: Unknown magic byte

我正在尝试使用 lenses.io 反应器使用从 MQTT 到 Kafka 的消息。最新版本的 Stream Reactor

Kafka/Confluent版本

sh-4.4$ kafka-topics --version
7.1.0-ccs (Commit:c86722379ab997cc)
kafka-connect-mqtt-3.0.1-2.5.0-all.jar

预期行为:avro 主题应该打印在控制台上

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

其他详细信息

 kafka-connect:
    image: kafka-connect
    build:
      context: .
    hostname: kafka-connect
    container_name: kafka-connect
    depends_on:
      - zookeeper-1
      - kafka-broker-1
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: SSL://kafka-broker-1:19093
      CONNECT_GROUP_ID: 'kafka-connect'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_TOPIC: 'connect-config-storage'
      CONNECT_OFFSET_STORAGE_TOPIC: 'connect-offset-storage'
      CONNECT_STATUS_STORAGE_TOPIC: 'connect-status-storage'
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" 
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_PLUGIN_PATH: /etc/kafka/secrets/plugins
      CONNECT_SECURITY_PROTOCOL: 'SSL'
      CONNECT_SSL_KEY_PASSWORD: confluent
      CONNECT_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_SSL_TRUSTSTORE_PASSWORD: confluent
      CONNECT_KAFKASTORE_SECURITY_PROTOCOL: 'SSL'
      CONNECT_KAFKASTORE_SSL_KEY_PASSWORD: confluent
      CONNECT_KAFKASTORE_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_KAFKASTORE_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: confluent
      CONNECT_PRODUCER_SECURITY_PROTOCOL: 'SSL'
      CONNECT_PRODUCER_SSL_KEY_PASSWORD: confluent
      CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: confluent
      CONNECT_CONSUMER_SECURITY_PROTOCOL: 'SSL'
      CONNECT_CONSUMER_SSL_KEY_PASSWORD: confluent
      CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: confluent
    volumes:
      - ${KAFKA_SSL_SECRETS_DIR}/connects:/etc/kafka/secrets
    networks:
      - kafka-cluster-network

连接器属性配置(my-connector.properties)

 curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
          "name": "mqtt-source",
            "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
            "tasks.max": "1",
            "topics": "mqtt",
            "connect.mqtt.connection.clean": "true",
            "connect.mqtt.connection.timeout": "1000",
            "connect.mqtt.kcql": "INSERT INTO mqtt SELECT * FROM /ais",
            "connect.mqtt.connection.keep.alive": "1000",
            "connect.mqtt.source.converters": "/ais=com.datamountaineer.streamreactor.connect.converters.source.AvroConverter",
            "connect.source.converter.avro.schemas": "/ais=/etc/kafka/secrets/plugins/classAPositionReportSchema.json",
            "connect.mqtt.client.id": "dm_source_id",
            "connect.mqtt.converter.throw.on.error": "true",
            "connect.mqtt.hosts": "tcp://mqtt:1883",
            "connect.mqtt.service.quality": "1"
        }'  http://localhost:8083/connectors/mqtt-source/config | jq .

avro json 文件

{
    "type": "record",
    "name": "aisClassAPositionReport",
    "namespace": "com.landoop.ais",
    "doc": "Schema for AIS Class A Position Reports.",
    "fields": [
      {
        "name": "Type",
        "type": "int",
        "doc": "The type of the AIS Message. 1/2/3 are Class A position reports."
      },
      {
        "name": "Repeat",
        "type":"int",
        "doc": "Repeat Indicator"
      },
      {
        "name": "MMSI",
        "type": "long",
        "doc": "User ID (MMSI)"
      },
      {
        "name": "Speed",
        "type": "float",
        "doc": "Speed over Ground (SOG)"
      },
      {
        "name": "Accuracy",
        "type": "boolean",
        "doc": "Position Accuracy"
      },
      {
        "name": "Longitude",
        "type": "double",
        "doc": "Longitude"
      },
      {
        "name": "Latitude",
        "type": "double",
        "doc": "Latitude"
      },
      {
        "name": "Course",
        "type": "float",
        "doc": "Course over Ground (COG)"
      },
      {
        "name": "Heading",
        "type": "int",
        "doc": "True Heading (HDG)"
      },
      {
        "name": "Second",
        "type": "int",
        "doc": "Time Stamp"
      },
      {
        "name": "RAIM",
        "type": "boolean",
        "doc": "RAIM flag"
      },
      {
        "name": "Radio",
        "type": "long",
        "doc": "Radio Status"
      },
      {
        "name": "Status",
        "type": "int",
        "doc": "Navigation Status (enumerated type)"
      },
      {
        "name": "Turn",
        "type": "float",
        "doc": "Rate of Turn (ROT)"
      },
      {
        "name": "Maneuver",
        "type": "int",
        "doc": "Manuever Indicator (enumerated type)"
      },
      {
        "name": "Timestamp",
        "type": "long",
        "doc": "Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering."
      }
    ]
  }

MQTT 代理消息

mosquitto_pub \
    -m "{\"Type\": 384558914, \"Repeat\": 1429873353, \"MMSI\": 1421443607430111832, \"Speed\": 0.32155126, \"Accuracy\": true, \"Longitude\": 0.3627212439937161, \"Latitude\": 0.2725890739370421, \"Course\": 0.99500954, \"Heading\": -2064209033, \"Second\": -1096102271, \"RAIM\": true, \"Radio\": -189624595456590919, \"Status\": -139830130, \"Turn\": 0.035991907, \"Maneuver\": 1595359693, \"Timestamp\": -932628952948741103}" \
    -d -r -t /ais

完整日志

sh-4.4$ kafka-avro-console-consumer --bootstrap-server kafka-broker-1:19093                                       --topic mqtt                                       --from-beginning                                       --max-messages 10                                       --consumer.config /etc/kafka/secrets/host.consumer.ssl.config                                       --property schema.registry.url=http://0.0.0.0:8081
[2022-04-13 05:49:22,333] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-13 05:49:23,082] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka-broker-1:19093]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = console-consumer
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-27706
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm =
        ssl.engine.factory.class = null
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = /etc/kafka/secrets/kafka.consumer.keystore.jks
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = /etc/kafka/secrets/kafka.consumer.truststore.jks
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-04-13 05:49:23,269] INFO Kafka version: 7.1.0-ce (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka commitId: 5c05312ab63acecf (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka startTimeMs: 1649828963261 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,274] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Subscribed to topic(s): mqtt (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:24,085] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting the last seen epoch of partition mqtt-0 to 0 since the associated topicId changed from null to eLc9qW-WTemQ53DDH9JgzA (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,093] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Cluster ID: 7mB45_SgTXSxROWQruwrRQ (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,095] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Discovered group coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,099] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,167] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: need to re-join with the given member-id (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,168] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,174] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully joined group with generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,179] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Finished assignment for group at generation 1: {console-consumer-35015e12-2725-473a-b7b1-70cce478ed76=Assignment(partitions=[mqtt-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,202] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully synced group in generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,203] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Notifying assignor about the new Assignment(partitions=[mqtt-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,210] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Adding newly assigned partitions: mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,234] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Found no committed offset for partition mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,370] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Seeking to offset 1 for partition mqtt-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Revoke previously assigned partitions mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Member console-consumer-35015e12-2725-473a-b7b1-70cce478ed76 sending LeaveGroup request to coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,374] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,375] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,383] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,396] INFO App info kafka.consumer for console-consumer unregistered (org.apache.kafka.common.utils.AppInfoParser)
Processed a total of 1 messages
[2022-04-13 05:49:27,400] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:87)
        at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92)
        at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

更新:

还在架构注册表上注册了架构

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "{\"type\": \"record\",    \"name\": \"aisClassAPositionReport\",    \"namespace\": \"com.landoop.ais\",    \"doc\": \"Schema for AIS Class A Position Reports.\",    \"fields\": [{\"name\": \"Type\",\"type\": \"int\",\"doc\": \"The type of the AIS Message. 1/2/3 are Class A position reports.\"},{\"name\": \"Repeat\",\"type\":\"int\",\"doc\": \"Repeat Indicator\"},{\"name\": \"MMSI\",\"type\": \"long\",\"doc\": \"User ID (MMSI)\"},{\"name\": \"Speed\",\"type\": \"float\",\"doc\": \"Speed over Ground (SOG)\"},{\"name\": \"Accuracy\",\"type\": \"boolean\",\"doc\": \"Position Accuracy\"},{\"name\": \"Longitude\",\"type\": \"double\",\"doc\": \"Longitude\"},{\"name\": \"Latitude\",\"type\": \"double\",\"doc\": \"Latitude\"},{\"name\": \"Course\",\"type\": \"float\",\"doc\": \"Course over Ground (COG)\"},{\"name\": \"Heading\",\"type\": \"int\",\"doc\": \"True Heading (HDG)\"},{\"name\": \"Second\",\"type\": \"int\",\"doc\": \"Time Stamp\"},{\"name\": \"RAIM\",\"type\": \"boolean\",\"doc\": \"RAIM flag\"},{\"name\": \"Radio\",\"type\": \"long\",\"doc\": \"Radio Status\"},{\"name\": \"Status\",\"type\": \"int\",\"doc\": \"Navigation Status (enumerated type)\"},{\"name\": \"Turn\",\"type\": \"float\",\"doc\": \"Rate of Turn (ROT)\"},{\"name\": \"Maneuver\",\"type\": \"int\",\"doc\": \"Manuever Indicator (enumerated type)\"},{\"name\": \"Timestamp\",\"type\": \"long\",\"doc\": \"Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering.\"}]}"}' http://0.0.0.0:8081/subjects/mqtt-value/versions

我做错了什么?

根据 CONNECT_VALUE_CONVERTER,您正在生成 JSON,而不是 Avro,因此您不能使用 kafka-avro-console-consumer 来读取 JSON 数据,并且您的 Avro 架构未被使用

我不确定 connect.mqtt.source.converters 是如何工作的,但您尚未将其配置为使用任何注册表,这是 kafka-avro-console-consumer 工作的必要条件