Kafka 连接到 PostgreSQL - org.apache.kafka.connect.errors.DataException:无法将主题反序列化为 Avro

Kafka-connect to PostgreSQL - org.apache.kafka.connect.errors.DataException: Failed to deserialize topic to Avro

设置

我已经在 Ubuntu 虚拟机上以独立模式安装了最新 (7.0.1) 版本的 Confluent 平台。

Python Avro 格式制作人

使用 this 示例 Avro 生产者生成从数据到 Kafka 主题 (pmu214) 的流。

制片人似乎工作正常。我会根据要求提供完整的代码。 生产者输出:

Raw data: {"pmu_id": 2, "time": 1644577001.22, "measurements": [{"stream_id": 2, "stat": "ok", "phasors": [[27.22379, 0.0], [24.672079638784002, -2.075237618663568], [25.11940552135938, 2.10660756475536], [3248.794237867336, -0.06468446412011757], [3068.6629010042793, -2.152472189017548], [2990.0809353594427, 2.031751749658583], [0.0, 0.0], [3101.9477751890026, -0.06193618455080409]], "analog": [], "digital": [0], "frequency": 50.022, "rocof": 0}]}
PMU record b'e5c1e5a8-3e44-465d-98c4-93f896ec1b14' successfully produced to pmu214 [0] at offset 38256

KSQL

ksqld 中,记录似乎达到了 ksqldb 好的:

rowtime: 2022/02/11 09:48:05.431 Z, key: [26151234-d3dd-4b7c-9222-2867@3486128305426751843/-], value: \x00\x00\x00\x00\x01\x04\xAA\xC3\xB1\xA0\x0C\x04\x04ok\xC2p\xD9A\x0F`\x
9F>.b\xC6A\xB9\xC8\xE2\xBF\xC5%\xC8A\x13v\x1A@\x8FVKE\xF9\xF8u>\xC5\xC5?E\xEA\xBA\xEC\xBF\xE0\xFA:E\xD5~\x15@\x00\x00\x00\x00\x00\x00\x00\x00\x84\x07BEs\xD1w>\x04[]\x020b\x0
2, partition: 0
Index 0 out of bounds for length 0
Topic printing ceased

卡夫卡连接

这是用于连接到 PostgreSQL 的命令:

bin/connect-standalone etc/kafka/connect-standalone.properties etc/schema-registry/connect-avro-standalone.properties

这里是connect-avro-standalone.properties的内容:

bootstrap.servers=localhost:9092
name=sinkIRIpostgre
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/mydb
topics=pmu214

connection.user=mydbuser
connection.password=mypass
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
pk.fields=MESSAGE_KEY

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

除了安装的插件外,connect-standalone.properties 我没有做任何更改。

plugin.path=/home/proton/kafkaConnectors/confluentinc-kafka-connect-jdbc-10.3.2,/home/proton/kafkaConverters/confluentinc-kafka-connect-json-schema-converter-7.0.1,/home/proton/kafkaConverters/confluentinc-kafka-connect-avro-converter-7.0.1/

收到错误

ERROR [sinkIRIpostgre|task-0] WorkerSinkTask{id=sinkIRIpostgre-0} Error converting message key in topic 'pmu214' partition 0 at offset 0 and timestamp 1644560570372: Failed to deserialize data for topic pmu214 to Avro: (org.apache.kafka.connect.runtime.WorkerSinkTask:552) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pmu214 to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:550) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)

如果您按字面意思 运行 Python 示例代码,则密钥不是 Avro,因此预计会在 key.converter 上失败,如图所示

Error converting message key