如何检查 avro 架构注册表使用情况
how to check avro schema registry usage
我正在通过 avro4s 使用 avro。
这是我对 consumer/producer
的配置
def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
system,
new StringSerializer,
new ByteArraySerializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withProperty("auto.create.topics.enable", "true")
def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
ConsumerSettings(
system,
new StringDeserializer,
new ByteArrayDeserializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
我怀疑寄存器被使用了。
当我的应用程序是 运行 时,它在架构注册表日志中是沉默的。
如何检查我的应用程序是否使用了注册表?
如果不是 - 如何解决?
您使用了错误的 classes,因此您的属性可能会出现错误
您实际上需要在此处使用 KafkaAvroSerializer
作为 Producer
new StringSerializer,
new ByteArraySerializer)
和KafkaAvroDeserializer
这里的消费者
new StringDeserializer,
new ByteArrayDeserializer)
并尝试将 String, Array[Byte]
更改为 GenericRecord
或某些情况下-class 你从 Avro4s
我正在通过 avro4s 使用 avro。 这是我对 consumer/producer
的配置 def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
system,
new StringSerializer,
new ByteArraySerializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withProperty("auto.create.topics.enable", "true")
def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
ConsumerSettings(
system,
new StringDeserializer,
new ByteArrayDeserializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
我怀疑寄存器被使用了。 当我的应用程序是 运行 时,它在架构注册表日志中是沉默的。
如何检查我的应用程序是否使用了注册表?
如果不是 - 如何解决?
您使用了错误的 classes,因此您的属性可能会出现错误
您实际上需要在此处使用 KafkaAvroSerializer
作为 Producer
new StringSerializer,
new ByteArraySerializer)
和KafkaAvroDeserializer
这里的消费者
new StringDeserializer,
new ByteArrayDeserializer)
并尝试将 String, Array[Byte]
更改为 GenericRecord
或某些情况下-class 你从 Avro4s