Kafka Connect BigQuery Sink Connector 从 Schema Registry 请求不存在的 key-subject 名称
Kafka Connect BigQuery Sink Connector requests non-existing key-subject names from the Schema Registry
这是该问题的后续问题:
在我们的 Kafka (Avro) 事件中尝试使用 confluentinc/kafka-connect-bigquery 时,我 运行 遇到以下错误:
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.rating.annotated to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
[...]
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 619
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'domain.rating.annotated-com.acme.message_schema.type.domain.key.DefaultKey' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440)
[...]
确实存在的主题是
curl --silent -X GET http://avro-schema-registry.core-kafka.svc.cluster.local:8081/subjects | jq .
[...]
"domain.rating.annotated-com.acme.message_schema.domain.rating.annotated.Key",
"domain.rating.annotated-com.acme.message_schema.domain.rating.annotated.RatingTranslated",
[...]
为什么它要寻找 ...DefaultKey
,我怎样才能让它做正确的事情?
我的 properties/standalone.properties
(我正在使用 quickstart 文件夹。)如下所示:
bootstrap.servers=kafka.core-kafka.svc.cluster.local:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://avro-schema-registry.core-kafka.svc.cluster.local:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://avro-schema-registry.core-kafka.svc.cluster.local:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
啊,我需要以下东西:
key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
这样就可以了。 :)
我真的很难浏览文档。抱歉这个多余的问题。
这是该问题的后续问题:
在我们的 Kafka (Avro) 事件中尝试使用 confluentinc/kafka-connect-bigquery 时,我 运行 遇到以下错误:
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.rating.annotated to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
[...]
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 619
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'domain.rating.annotated-com.acme.message_schema.type.domain.key.DefaultKey' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440)
[...]
确实存在的主题是
curl --silent -X GET http://avro-schema-registry.core-kafka.svc.cluster.local:8081/subjects | jq .
[...]
"domain.rating.annotated-com.acme.message_schema.domain.rating.annotated.Key",
"domain.rating.annotated-com.acme.message_schema.domain.rating.annotated.RatingTranslated",
[...]
为什么它要寻找 ...DefaultKey
,我怎样才能让它做正确的事情?
我的 properties/standalone.properties
(我正在使用 quickstart 文件夹。)如下所示:
bootstrap.servers=kafka.core-kafka.svc.cluster.local:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://avro-schema-registry.core-kafka.svc.cluster.local:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://avro-schema-registry.core-kafka.svc.cluster.local:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
啊,我需要以下东西:
key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
这样就可以了。 :)
我真的很难浏览文档。抱歉这个多余的问题。