无法将主题的数据反序列化到 protobuf 接收器连接器
Failed to deserialize data for topic to protobuf sink connector
我可以使用不同的工具(如 conduktor)使用来自 kafka 主题的生成的 protobuf 消息数据。但是,当我尝试使用 JdbcSinkConnector 轮询数据时,它会抛出异常
org.apache.kafka.common.errors.SerializationException: Error
deserializing Protobuf message
我调用kafka-connect api时请看下面的错误详情
URL : http://localhost:8083/connectors?expand=info&expand=status
JSON响应和跟踪
{
"sink_postgres_03_proto":{
"info":{
"name":"sink_postgres_03_proto",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.password":"54321",
"topics":"order-messages",
"value.converter.schema.registry.url":"http://localhost:8081",
"key.converter.schemas.enable":"false",
"auto.evolve":"true",
"connection.user":"postgres",
"value.converter.schemas.enable":"true",
"name":"sink_postgres_03_proto",
"auto.create":"true",
"connection.url":"jdbc:postgresql://localhost:5432/CallHistoryService",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"insert.mode":"insert",
"key.converter":"org.apache.kafka.connect.storage.StringConverter"
},
"tasks":[
{
"connector":"sink_postgres_03_proto",
"task":0
}
],
"type":"sink"
},
"status":{
"name":"sink_postgres_03_proto",
"connector":{
"state":"RUNNING",
"worker_id":"kafka-connect:8083"
},
"tasks":[
{
"id":0,
"state":"FAILED",
"worker_id":"kafka-connect:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic order-messages to Protobuf: \n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 1\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)\n\tat java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:474)\n\tat java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:569)\n\tat java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:242)\n\tat java.base/sun.net.www.http.HttpClient.New(HttpClient.java:341)\n\tat java.base/sun.net.www.http.HttpClient.New(HttpClient.java:362)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1253)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1015)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1592)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)\n\tat java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)\n\tat io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:117)\n\tat io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:235)\n\tat io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:163)\n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:107)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
}
],
"type":"sink"
}
}
}
请指教
谢谢!
如果您使用 trace
节点并将 \n
和 \t
转换为换行符和制表符,您将获得一个可读的堆栈跟踪,其中显示了问题:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic order-messages to Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 1
Caused by: java.net.ConnectException: Connection refused (Connection refused)
错误是 java.net.ConnectException: Connection refused
,这意味着 (a) 您错误配置了架构注册表的位置,或者 (b) 架构注册表不是 运行
我可以使用不同的工具(如 conduktor)使用来自 kafka 主题的生成的 protobuf 消息数据。但是,当我尝试使用 JdbcSinkConnector 轮询数据时,它会抛出异常
org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message
我调用kafka-connect api时请看下面的错误详情
URL : http://localhost:8083/connectors?expand=info&expand=status
JSON响应和跟踪
{
"sink_postgres_03_proto":{
"info":{
"name":"sink_postgres_03_proto",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.password":"54321",
"topics":"order-messages",
"value.converter.schema.registry.url":"http://localhost:8081",
"key.converter.schemas.enable":"false",
"auto.evolve":"true",
"connection.user":"postgres",
"value.converter.schemas.enable":"true",
"name":"sink_postgres_03_proto",
"auto.create":"true",
"connection.url":"jdbc:postgresql://localhost:5432/CallHistoryService",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"insert.mode":"insert",
"key.converter":"org.apache.kafka.connect.storage.StringConverter"
},
"tasks":[
{
"connector":"sink_postgres_03_proto",
"task":0
}
],
"type":"sink"
},
"status":{
"name":"sink_postgres_03_proto",
"connector":{
"state":"RUNNING",
"worker_id":"kafka-connect:8083"
},
"tasks":[
{
"id":0,
"state":"FAILED",
"worker_id":"kafka-connect:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic order-messages to Protobuf: \n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 1\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)\n\tat java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:474)\n\tat java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:569)\n\tat java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:242)\n\tat java.base/sun.net.www.http.HttpClient.New(HttpClient.java:341)\n\tat java.base/sun.net.www.http.HttpClient.New(HttpClient.java:362)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1253)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1015)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1592)\n\tat java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)\n\tat java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)\n\tat io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:117)\n\tat io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:235)\n\tat io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:163)\n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:107)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
}
],
"type":"sink"
}
}
}
请指教
谢谢!
如果您使用 trace
节点并将 \n
和 \t
转换为换行符和制表符,您将获得一个可读的堆栈跟踪,其中显示了问题:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic order-messages to Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 1
Caused by: java.net.ConnectException: Connection refused (Connection refused)
错误是 java.net.ConnectException: Connection refused
,这意味着 (a) 您错误配置了架构注册表的位置,或者 (b) 架构注册表不是 运行