反序列化来自 Kafka 流的 JSON 数据 - Kafka 连接到 PostgreSQL

Deserializing JSON data from Kafka stream - Kafka connect to PostgreSQL

我正在 Ubuntu 上以独立模式将 Kafka_2.12-3.0.0 的主题流式传输到 PosgreSQL 并出现反序列化错误。

使用 pip 包中的 confluent_kafka 在 python 中生成 kafka 流(工作正常):

{"pmu_id": 2, "time": 1644329854.08, "stream_id": 2, "stat": "ok", "ph_i1_r": 27.682000117654074, "ph_i1_j": -1.546410917622178, "ph_i2_r": 25.055846468243697, "ph_i2_j": 2.6658974347348012, "ph_i3_r": 25.470616978816988, "ph_i3_j": 0.5585993153435624, "ph_v4_r": 3338.6901623241415, "ph_v4_j": -1.6109426103444193, "ph_v5_r": 3149.0595421490525, "ph_v5_j": 2.5863594222073076, "ph_v6_r": 3071.4231229187553, "ph_v6_j": 0.4872377558335442, "ph_7_r": 0.0, "ph_7_j": 0.0, "ph_8_r": 3186.040175515683, "ph_8_j": -1.6065850592620299, "analog": [], "digital": 0, "frequency": 50.014, "rocof": 1}

在 PostgreSQL 中存储的配置

在我的 kafka_2.12-3.0.0/config/connect-standalone.properties 中,我添加了连接器和转换器:

plugin.path=/home/user/kafkaConnectors/confluentinc-kafka-connect-jdbc-10.3.2,/home/user/kafkaConverters/confluentinc-kafka-connect-json-schema-converter-7.0.1

我正在执行:

bin/connect-standalone.sh config/connect-standalone.properties config/sink-postgres.properties

我满config/sink-postgres.properties :

name=sinkIRIpostgre
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/pgdb
topics=pmu1
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
connection.user=pguser
connection.password=pgpass
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
pk.fields=MESSAGE_KEY

获取错误:

ERROR [sinkIRIpostgre|task-0] WorkerSinkTask{id=sinkIRIpostgre-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:193)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic pmu214:
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
        at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:177)
        at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:235)
        at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
        ... 18 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:112)

编辑(Python 代码)

这里是python用于生成kafka producer的代码:

from confluent_kafka import Producer
..
p = Producer({'bootstrap.servers': self.kafka_bootstrap_servers})
...
record_key = str(uuid.uuid4())
record_value = self.createKafkaJSON(base_message)
p.produce(self.kafka_topic, key=record_key, value=record_value)
                p.poll(0)

函数 createKafkaJSON 正在返回 json.dumps(kafkaDictFinal).encode('utf-8') 其中 kafkaDictFinal 是 Python 字典。

在 main 中调用生产者:

  KafkaPMUProducer(pdc_id=2, pmu_ip="x.x.x.x", pmu_port=4712, kafka_bootstrap_servers ="localhost:9092", kafka_topic="pmu214").kafka_producer()

如果您直接从 Python 应用程序中写 JSON,那么您将需要使用 org.apache.kafka.connect.json.JsonConverter 转换器,但您的消息将需要 schemapayload 属性。

io.confluent.connect.json.JsonSchemaConverter 依赖于架构注册表 wire format,其中包含一个“魔术字节”(因此出现错误)。

您可以在此了解更多信息 deep-dive article about serialisation and Kafka Connect, and see how Python can produce JSON data with a schema using SerializingProducer