Kafka 接收器连接器 --> postgres,失败并显示 avro JSON 数据

Kafka sink connector --> postgres, fails with avro JSON data

我设置了一个 Kafka JDBC 接收器来将事件发送到 PostgreSQL。 我编写了这个简单的生产者,它将带有模式(avro)数据的 JSON 发送到主题,如下所示:

producer.py (kafka-python)

biometrics = {
        "heartbeat": self.pulse, # integer
        "oxygen": self.oxygen,# integer
        "temprature": self.temprature, # float
        "time": time # string
    }

avro_value = {
               "schema": open(BASE_DIR+"/biometrics.avsc").read(),
               "payload": biometrics
             }

producer.send("biometrics",
                      key="some_string",
                      value=avro_value
                      )

值架构:

{
    "type": "record",
    "name": "biometrics",
    "namespace": "athlete",
    "doc": "athletes biometrics"
    "fields": [
        {
            "name": "heartbeat",
            "type": "int",
            "default": 0
        },
        {
            "name": "oxygen",
            "type": "int",
            "default": 0
        },
        {
            "name": "temprature",
            "type": "float",
            "default": 0.0
        },
        {
            "name": "time",
            "type": "string"
            "default": ""
        }
    ]
}

连接器配置(没有主机、密码等)

{
    "name": "jdbc_sink",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter ",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "biometrics",
    "insert.mode": "insert",
    "auto.create": "true"
}

但是我的连接器严重失败,出现三个错误,我无法找出其中任何一个的原因:

TL;DR;日志版本

(Error 1) Caused by: org.apache.kafka.connect.errors.DataException: biometrics
(Error 2) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
(Error 3) Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

完整日志

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: biometrics
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:498)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

谁能帮我理解这些错误和根本原因?

错误是因为您需要在您的连接器中使用 JSONConverter class w/ value.converter.schemas.enabled=true 因为这是生成的,但是 schema 有效负载是 不是 payload 的 Avro 模式表示,因此单独进行这些更改可能仍然会失败...

如果您想实际发送 Avro,请使用 confluent-kafka 库中的 AvroProducer,这需要 运行 架构注册表。