具有外部 https 连接的 kafka 模式

kafka schema with external https connection

我开始对卡夫卡感到绝望了。对于一个私人项目,一家公司给我发了一个卡夫卡流。 经过长时间的尝试,我终于成功连接到 bootstrap 服务器并收到了第一条消息。但没有反序列化。目前数据格式如下所示: 4868fa8▒▒▒▒_9601FLHBK053A5T▒z+B▒▒▒▒▒▒

公司以 avro 格式发送密钥和值,我也得到了几个模式 url。但我无法正确使用它们,因此无法获得可读数据。不管我怎么输入,总是报错。只有当我在没有任何模式的情况下检索时,我才会像上面那样收到消息。

架构 url 位于 https 和外部,因此我已经尝试为 url 创建信任库。有人可以提示我还可以尝试什么吗?

bin/kafka-avro-console-consumer --bootstrap-server kafka1.some.url:9093,kafka2.some.url:9093,kafka3.some.url:9093 --topic myTopic --consumer-property security.protocol=SSL --consumer-property ssl.protocol=TLSv1.2 --consumer-property ssl.truststore.location=ssl/myTruststore.jks --consumer-property ssl.truststore.password=xxx --consumer-property  ssl.keystore.location=ssl/keystore.jks --consumer-property ssl.keystore.password=xxx --consumer-property ssl.key.password=xxx --consumer-property schema.registry=https://schema-reg.some.url  --from-beginning

这里出现的第一个问题,我被告知 3 urls 的模式。我在程序调用中指定的一个基数 url,另外两个用于键,一个用于值。两者都具有以下格式:

https://schema-reg.some.url/subjects/Key/versions/latest/schema

https://schema-reg.some.url/subjects/Value/versions/latest/schema

看起来像(关键):

{"type":"record","name":"WoKey","namespace":"somenameSpace","fields":[{"name":"someId","type":"string","aliases":["userId"]},{"name":"nextId","type":["null","string"],"default":null}]}

不幸的是,我完全被难住了。我也得到了一个 typescipt,但我想直接在 kafka 中使用整个东西,然后通过 kafka-connect sink 将它写入 MySQL DB。

如果我使用上述设置启动 kafka,我总是会收到错误消息:

ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 423 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:333) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:114) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133) at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92) at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.net.ConnectException: Connection refused (Connection refused)

由于错误显示“连接被拒绝”,我认为问题与 --consumer-property schema.registry=

有关

正确选项是--property schema.registry.url=https://...

关于提到的 URL,默认主题在主题名称后会有 -value-key

否则,一个建议是对文件使用 --consumer-config,而不是需要输入长命令选项。