Kafka 连接、JdbcSinkConnector - 获取 "Error retrieving Avro schema for id 1, Subject not found.; error code: 40401"
Kafka Connect, JdbcSinkConnector - Getting "Error retrieving Avro schema for id 1, Subject not found.; error code: 40401"
我创建了一个 NiFi 流,最终将 json 记录发布为具有 Avro 编码值和字符串键的记录,使用 Confluent Registry 中的模式作为值模式。这是 NiFi 中的 configuration for the AvroRecordSetWriter。
我现在正尝试使用 Kafka Connect (connect-standalone
) 通过 JdbcSinkConnector 将消息移动到 PostgreSQL 数据库,但出现以下错误:Error retrieving Avro schema for id 1
我已确认我的 Confluent Registry 中有一个 ID 为 1 的架构。以下是我对 Connect 任务的配置
工人配置:
bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java
连接器配置:
name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true
我在 NiFi 中创建了一个正确使用消息的流,并且我还通过指定 [=14] 使用 kafka-avro-console-consumer 成功使用了消息(在输出中格式为 JSON) =].请注意,我是 运行 Docker 容器中的消费者,这就是 url 不是本地主机的原因。
我不确定我错过了什么。我唯一的想法是我对密钥转换器使用了错误的 class,但是对于给定的错误,这没有意义。谁能看出我做错了什么?
我对 Nifi 了解不多,但我看到架构的名称是 "rds",并且在错误日志中说它没有在架构注册表中找到主题。
Kafka 使用 KafkaAvroSerializer
序列化 avro 记录,同时在模式注册表中注册关联的 avro 模式。
它使用 KafkaAvroDeserializer
反序列化 avro 记录并从架构注册表中检索关联的架构。
模式注册表将模式存储到称为 "subjects" 的类别中,为记录命名主题的默认行为是:topic_name-value
用于值记录,topic_name-key
用于键。
在你的例子中,你没有使用 Kafka 注册模式,而是使用 Nifi,所以我的猜测是名称 "rds" 出现在模式注册表中或者是模式注册表中的主题名称。
您如何验证您的架构是否正确存储?
通常在您的情况下,正确的主题将是 rds-value
,因为您仅在值记录上使用模式注册表。
我创建了一个 NiFi 流,最终将 json 记录发布为具有 Avro 编码值和字符串键的记录,使用 Confluent Registry 中的模式作为值模式。这是 NiFi 中的 configuration for the AvroRecordSetWriter。
我现在正尝试使用 Kafka Connect (connect-standalone
) 通过 JdbcSinkConnector 将消息移动到 PostgreSQL 数据库,但出现以下错误:Error retrieving Avro schema for id 1
我已确认我的 Confluent Registry 中有一个 ID 为 1 的架构。以下是我对 Connect 任务的配置
工人配置:
bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java
连接器配置:
name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true
我在 NiFi 中创建了一个正确使用消息的流,并且我还通过指定 [=14] 使用 kafka-avro-console-consumer 成功使用了消息(在输出中格式为 JSON) =].请注意,我是 运行 Docker 容器中的消费者,这就是 url 不是本地主机的原因。
我不确定我错过了什么。我唯一的想法是我对密钥转换器使用了错误的 class,但是对于给定的错误,这没有意义。谁能看出我做错了什么?
我对 Nifi 了解不多,但我看到架构的名称是 "rds",并且在错误日志中说它没有在架构注册表中找到主题。
Kafka 使用 KafkaAvroSerializer
序列化 avro 记录,同时在模式注册表中注册关联的 avro 模式。
它使用 KafkaAvroDeserializer
反序列化 avro 记录并从架构注册表中检索关联的架构。
模式注册表将模式存储到称为 "subjects" 的类别中,为记录命名主题的默认行为是:topic_name-value
用于值记录,topic_name-key
用于键。
在你的例子中,你没有使用 Kafka 注册模式,而是使用 Nifi,所以我的猜测是名称 "rds" 出现在模式注册表中或者是模式注册表中的主题名称。
您如何验证您的架构是否正确存储?
通常在您的情况下,正确的主题将是 rds-value
,因为您仅在值记录上使用模式注册表。