Kafka Connect BigQuery Sink Connector 从 Schema Registry 请求不正确的主题名称
Kafka Connect BigQuery Sink Connector requests incorrect subject names from the Schema Registry
在我们的 Kafka (Avro) 事件中尝试使用 confluentinc/kafka-connect-bigquery 时,我 运行 遇到以下错误:
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.user to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
[...]
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 619
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'domain.user-key' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440)
[...]
是的,架构注册表中没有主题 domain.user-key
:
curl --silent -X GET http://avro-schema-registry.core-kafka.svc.cluster.local:8081/subjects | jq . | grep "domain\.user"
[...]
"domain.user-com.acme.message_schema.type.domain.key.DefaultKey",
"domain.user-com.acme.message_schema.domain.user.Key",
[...]
如何让连接器使用正确的主题名称?
我的 properties/connector.properties
(我正在使用 quickstart 文件夹。)如下所示:
[...]
topics=domain.user
sanitizeTopics=true
autoUpdateSchemas=true
autoCreateTables=true
allowNewBigQueryFields=true
[...]
最后,我想使用 topics.regex=domain.*
而不是 topics=domain.user
来捕获我们所有的领域事件主题,但是我得到了相同类型的错误(只是针对不同的主题).
您需要将 key.converter.key.subject.name.strategy
& value.converter.value.subject.name.strategy
设置为 io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
在我们的 Kafka (Avro) 事件中尝试使用 confluentinc/kafka-connect-bigquery 时,我 运行 遇到以下错误:
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.user to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
[...]
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 619
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'domain.user-key' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440)
[...]
是的,架构注册表中没有主题 domain.user-key
:
curl --silent -X GET http://avro-schema-registry.core-kafka.svc.cluster.local:8081/subjects | jq . | grep "domain\.user"
[...]
"domain.user-com.acme.message_schema.type.domain.key.DefaultKey",
"domain.user-com.acme.message_schema.domain.user.Key",
[...]
如何让连接器使用正确的主题名称?
我的 properties/connector.properties
(我正在使用 quickstart 文件夹。)如下所示:
[...]
topics=domain.user
sanitizeTopics=true
autoUpdateSchemas=true
autoCreateTables=true
allowNewBigQueryFields=true
[...]
最后,我想使用 topics.regex=domain.*
而不是 topics=domain.user
来捕获我们所有的领域事件主题,但是我得到了相同类型的错误(只是针对不同的主题).
您需要将 key.converter.key.subject.name.strategy
& value.converter.value.subject.name.strategy
设置为 io.confluent.kafka.serializers.subject.TopicRecordNameStrategy