Kafka Connect BigQuery Sink Connector 从 Schema Registry 请求不正确的主题名称

Kafka Connect BigQuery Sink Connector requests incorrect subject names from the Schema Registry

在我们的 Kafka (Avro) 事件中尝试使用 confluentinc/kafka-connect-bigquery 时,我 运行 遇到以下错误:

org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.user to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
[...]
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 619
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'domain.user-key' not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440)
[...]

是的,架构注册表中没有主题 domain.user-key

curl --silent -X GET http://avro-schema-registry.core-kafka.svc.cluster.local:8081/subjects | jq . | grep "domain\.user"
[...]
  "domain.user-com.acme.message_schema.type.domain.key.DefaultKey",
  "domain.user-com.acme.message_schema.domain.user.Key",
[...]

如何让连接器使用正确的主题名称?

我的 properties/connector.properties(我正在使用 quickstart 文件夹。)如下所示:

[...]
topics=domain.user
sanitizeTopics=true
autoUpdateSchemas=true
autoCreateTables=true
allowNewBigQueryFields=true
[...]

最后,我想使用 topics.regex=domain.* 而不是 topics=domain.user 来捕获我们所有的领域事件主题,但是我得到了相同类型的错误(只是针对不同的主题).

您需要将 key.converter.key.subject.name.strategy & value.converter.value.subject.name.strategy 设置为 io.confluent.kafka.serializers.subject.TopicRecordNameStrategy