Kafka 连接到 PostgreSQL - org.apache.kafka.connect.errors.DataException:无法将主题反序列化为 Avro
Kafka-connect to PostgreSQL - org.apache.kafka.connect.errors.DataException: Failed to deserialize topic to Avro
设置
我已经在 Ubuntu 虚拟机上以独立模式安装了最新 (7.0.1) 版本的 Confluent 平台。
Python Avro 格式制作人
使用 this 示例 Avro 生产者生成从数据到 Kafka 主题 (pmu214) 的流。
制片人似乎工作正常。我会根据要求提供完整的代码。
生产者输出:
Raw data: {"pmu_id": 2, "time": 1644577001.22, "measurements": [{"stream_id": 2, "stat": "ok", "phasors": [[27.22379, 0.0], [24.672079638784002, -2.075237618663568], [25.11940552135938, 2.10660756475536], [3248.794237867336, -0.06468446412011757], [3068.6629010042793, -2.152472189017548], [2990.0809353594427, 2.031751749658583], [0.0, 0.0], [3101.9477751890026, -0.06193618455080409]], "analog": [], "digital": [0], "frequency": 50.022, "rocof": 0}]}
PMU record b'e5c1e5a8-3e44-465d-98c4-93f896ec1b14' successfully produced to pmu214 [0] at offset 38256
KSQL
在 ksqld
中,记录似乎达到了 ksqldb
好的:
rowtime: 2022/02/11 09:48:05.431 Z, key: [26151234-d3dd-4b7c-9222-2867@3486128305426751843/-], value: \x00\x00\x00\x00\x01\x04\xAA\xC3\xB1\xA0\x0C\x04\x04ok\xC2p\xD9A\x0F`\x
9F>.b\xC6A\xB9\xC8\xE2\xBF\xC5%\xC8A\x13v\x1A@\x8FVKE\xF9\xF8u>\xC5\xC5?E\xEA\xBA\xEC\xBF\xE0\xFA:E\xD5~\x15@\x00\x00\x00\x00\x00\x00\x00\x00\x84\x07BEs\xD1w>\x04[]\x020b\x0
2, partition: 0
Index 0 out of bounds for length 0
Topic printing ceased
卡夫卡连接
这是用于连接到 PostgreSQL 的命令:
bin/connect-standalone etc/kafka/connect-standalone.properties etc/schema-registry/connect-avro-standalone.properties
这里是connect-avro-standalone.properties
的内容:
bootstrap.servers=localhost:9092
name=sinkIRIpostgre
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/mydb
topics=pmu214
connection.user=mydbuser
connection.password=mypass
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
pk.fields=MESSAGE_KEY
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
除了安装的插件外,connect-standalone.properties
我没有做任何更改。
plugin.path=/home/proton/kafkaConnectors/confluentinc-kafka-connect-jdbc-10.3.2,/home/proton/kafkaConverters/confluentinc-kafka-connect-json-schema-converter-7.0.1,/home/proton/kafkaConverters/confluentinc-kafka-connect-avro-converter-7.0.1/
收到错误
ERROR [sinkIRIpostgre|task-0] WorkerSinkTask{id=sinkIRIpostgre-0}
Error converting message key in topic 'pmu214' partition 0 at offset 0
and timestamp 1644560570372: Failed to deserialize data for topic
pmu214 to Avro: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic pmu214 to Avro: at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
at
org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:550)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:513)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
如果您按字面意思 运行 Python 示例代码,则密钥不是 Avro,因此预计会在 key.converter
上失败,如图所示
Error converting message key
设置
我已经在 Ubuntu 虚拟机上以独立模式安装了最新 (7.0.1) 版本的 Confluent 平台。
Python Avro 格式制作人
使用 this 示例 Avro 生产者生成从数据到 Kafka 主题 (pmu214) 的流。
制片人似乎工作正常。我会根据要求提供完整的代码。 生产者输出:
Raw data: {"pmu_id": 2, "time": 1644577001.22, "measurements": [{"stream_id": 2, "stat": "ok", "phasors": [[27.22379, 0.0], [24.672079638784002, -2.075237618663568], [25.11940552135938, 2.10660756475536], [3248.794237867336, -0.06468446412011757], [3068.6629010042793, -2.152472189017548], [2990.0809353594427, 2.031751749658583], [0.0, 0.0], [3101.9477751890026, -0.06193618455080409]], "analog": [], "digital": [0], "frequency": 50.022, "rocof": 0}]}
PMU record b'e5c1e5a8-3e44-465d-98c4-93f896ec1b14' successfully produced to pmu214 [0] at offset 38256
KSQL
在 ksqld
中,记录似乎达到了 ksqldb
好的:
rowtime: 2022/02/11 09:48:05.431 Z, key: [26151234-d3dd-4b7c-9222-2867@3486128305426751843/-], value: \x00\x00\x00\x00\x01\x04\xAA\xC3\xB1\xA0\x0C\x04\x04ok\xC2p\xD9A\x0F`\x
9F>.b\xC6A\xB9\xC8\xE2\xBF\xC5%\xC8A\x13v\x1A@\x8FVKE\xF9\xF8u>\xC5\xC5?E\xEA\xBA\xEC\xBF\xE0\xFA:E\xD5~\x15@\x00\x00\x00\x00\x00\x00\x00\x00\x84\x07BEs\xD1w>\x04[]\x020b\x0
2, partition: 0
Index 0 out of bounds for length 0
Topic printing ceased
卡夫卡连接
这是用于连接到 PostgreSQL 的命令:
bin/connect-standalone etc/kafka/connect-standalone.properties etc/schema-registry/connect-avro-standalone.properties
这里是connect-avro-standalone.properties
的内容:
bootstrap.servers=localhost:9092
name=sinkIRIpostgre
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/mydb
topics=pmu214
connection.user=mydbuser
connection.password=mypass
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
pk.fields=MESSAGE_KEY
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
除了安装的插件外,connect-standalone.properties
我没有做任何更改。
plugin.path=/home/proton/kafkaConnectors/confluentinc-kafka-connect-jdbc-10.3.2,/home/proton/kafkaConverters/confluentinc-kafka-connect-json-schema-converter-7.0.1,/home/proton/kafkaConverters/confluentinc-kafka-connect-avro-converter-7.0.1/
收到错误
ERROR [sinkIRIpostgre|task-0] WorkerSinkTask{id=sinkIRIpostgre-0} Error converting message key in topic 'pmu214' partition 0 at offset 0 and timestamp 1644560570372: Failed to deserialize data for topic pmu214 to Avro: (org.apache.kafka.connect.runtime.WorkerSinkTask:552) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pmu214 to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:550) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
如果您按字面意思 运行 Python 示例代码,则密钥不是 Avro,因此预计会在 key.converter
上失败,如图所示
Error converting message key