接收器连接器寻找主题值架构而不是记录名称

Sink connector looking for topic -value schema instead of record name

下面是 JDBC 接收器连接器

的有效载荷
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=Subs_V1
behavior.on.null.values=ignore
connection.password=***********
topics=Subscription
task.max=1
batch.size=500
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=admin
name=sink-jdbc-connector-Subs
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://MsSql-Instanse:dbName.    
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id

得到这样的模式注册表日志:

at java.base/java.lang.Thread.run(Thread.java:829)
[2022-01-19 08:24:35,474] INFO 127.0.0.1 - - [19/Jan/2022:08:24:35 +0000] "POST /subjects/Subscription-value?deleted=true HTTP/1.1" 404 96  1 (io.confluent.rest-utils.requests:62)
[2022-01-19 08:24:35,475] INFO Schema lookup under subject Subscription-value, deleted true, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource:91)
[2022-01-19 08:24:35,475] ERROR Request Failed with exception  (io.confluent.rest.exceptions.DebuggableExceptionMapper:62)
io.confluent.rest.exceptions.RestNotFoundException: Subject 'Subscription-value' not found.
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:69)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:105)
    at jdk.internal.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at 

请告诉我如何将我的 jdbs 接收器连接器指向读取 Subscription.Value 文件而不是订阅值

我的连接器显示了这个,它读取了记录,但由于未找到架构,它没有发送记录。

默认情况下,任何 Avro 生产者都应该创建该 -value 主题。否则,你需要匹配its serializer record strategy。比如你需要设置

value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

此外,理想情况下,您的密钥不是 Avro,而是像字符串或整数这样的简单类型