Kafka 连接、JdbcSinkConnector - 获取 "Error retrieving Avro schema for id 1, Subject not found.; error code: 40401"

Kafka Connect, JdbcSinkConnector - Getting "Error retrieving Avro schema for id 1, Subject not found.; error code: 40401"

我创建了一个 NiFi 流,最终将 json 记录发布为具有 Avro 编码值和字符串键的记录,使用 Confluent Registry 中的模式作为值模式。这是 NiFi 中的 configuration for the AvroRecordSetWriter。

我现在正尝试使用 Kafka Connect (connect-standalone) 通过 JdbcSinkConnector 将消息移动到 PostgreSQL 数据库,但出现以下错误:Error retrieving Avro schema for id 1

我已确认我的 Confluent Registry 中有一个 ID 为 1 的架构。以下是我对 Connect 任务的配置

工人配置:

bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java

连接器配置:

name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true

我在 NiFi 中创建了一个正确使用消息的流,并且我还通过指定 [=14] 使用 kafka-avro-console-consumer 成功使用了消息(在输出中格式为 JSON) =].请注意,我是 运行 Docker 容器中的消费者,这就是 url 不是本地主机的原因。

我不确定我错过了什么。我唯一的想法是我对密钥转换器使用了错误的 class,但是对于给定的错误,这没有意义。谁能看出我做错了什么?

我对 Nifi 了解不多,但我看到架构的名称是 "rds",并且在错误日志中说它没有在架构注册表中找到主题。

Kafka 使用 KafkaAvroSerializer 序列化 avro 记录,同时在模式注册表中注册关联的 avro 模式。 它使用 KafkaAvroDeserializer 反序列化 avro 记录并从架构注册表中检索关联的架构。

模式注册表将模式存储到称为 "subjects" 的类别中,为记录命名主题的默认行为是:topic_name-value 用于值记录,topic_name-key 用于键。

在你的例子中,你没有使用 Kafka 注册模式,而是使用 Nifi,所以我的猜测是名称 "rds" 出现在模式注册表中或者是模式注册表中的主题名称。

您如何验证您的架构是否正确存储?

通常在您的情况下,正确的主题将是 rds-value,因为您仅在值记录上使用模式注册表。