如何使用 C# 在 Kafka 中反序列化 Avro 消息

How to deserialize Avro messages in Kafka using C#

您好,我正在使用 Confluent kafka。我有 returns 通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如

 object options = ((GenericRecord)response.Message.Value["Product"])["Options"];

我在这里找到了一个

But how can I convert my schema into stream? I want to know If we can de-serialize into our c# model using any solution? Any help would be greatly appreciated. Thanks.

假设您使用的是 confluent-dot-net client, you can use AvroDeserializer:

(async) Avro deserializer. Use this deserializer with GenericRecord, types generated using the avrogen.exe tool or one of the following primitive types: int, long, float, double, boolean, string, byte[].

Example:

var consumeTask = Task.Run(() =>{
  using(var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig {
    SchemaRegistryUrl = schemaRegistryUrl
  }))
  using(var consumer = new ConsumerBuilder < string, GenericRecord > (new ConsumerConfig {
    BootstrapServers = bootstrapServers,
    GroupId = groupName
  }).SetKeyDeserializer(new AvroDeserializer < string > (schemaRegistry).AsSyncOverAsync()).SetValueDeserializer(new AvroDeserializer < GenericRecord > (schemaRegistry).AsSyncOverAsync()).SetErrorHandler((_, e) =>Console.WriteLine($ "Error: {e.Reason}")).Build()) {
    consumer.Subscribe(topicName);

    try {
      while (true) {
        try {
          var consumeResult = consumer.Consume(cts.Token);

          Console.WriteLine($ "Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
        }
        catch(ConsumeException e) {
          Console.WriteLine($ "Consume error: {e.Error.Reason}");
        }
      }
    }
    catch(OperationCanceledException) {
      // commit final offsets and leave the group.
      consumer.Close();
    }
  }
});