如何在 confluent kafka C# 中读取 GenericRecord 特定数据
How to read GenericRecord specific data in confluent kafka C#
这是我尝试读取 Avro 通用记录表单消费者的简单代码片段:
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
using (var consumer = new
ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
.SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
Console.WriteLine(consumeResult.Value.Schema);
Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// commit final offsets and leave the group.
consumer.Close();
}
}
如您所见,我可以记录架构,但不知道如何获取其数据值。
Console.WriteLine(consumeResult.Value.Schema);
Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
虽然此对象中的模式和数据是这样的consumeResult.Value
:
{架构:{"type":"record","name":"User","namespace":"Confluent.Kafka.Examples.AvroSpecific","fields" :[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]},内容: { 名称: sfs, favorite_number: 41, favorite_color: 蓝色, }}
我想阅读内容数据。
the content
is not accessible
我认为 Value.contents
不是您想要的。特别是因为 属性 是 private
,正如你提到的
getter 的定义就像一本字典 - Source code
尝试consumeResult.Value["favorite_number"]
当您执行 consumeResult.Value.Schema["favorite_number"]
时,您会得到架构中的 Field
对象,而不是外部记录中字段的值。 - Source code
这是我尝试读取 Avro 通用记录表单消费者的简单代码片段:
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
using (var consumer = new
ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
.SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
Console.WriteLine(consumeResult.Value.Schema);
Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// commit final offsets and leave the group.
consumer.Close();
}
}
如您所见,我可以记录架构,但不知道如何获取其数据值。
Console.WriteLine(consumeResult.Value.Schema);
Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
虽然此对象中的模式和数据是这样的consumeResult.Value
:
{架构:{"type":"record","name":"User","namespace":"Confluent.Kafka.Examples.AvroSpecific","fields" :[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]},内容: { 名称: sfs, favorite_number: 41, favorite_color: 蓝色, }}
我想阅读内容数据。
the
content
is not accessible
我认为 Value.contents
不是您想要的。特别是因为 属性 是 private
,正如你提到的
getter 的定义就像一本字典 - Source code
尝试consumeResult.Value["favorite_number"]
当您执行 consumeResult.Value.Schema["favorite_number"]
时,您会得到架构中的 Field
对象,而不是外部记录中字段的值。 - Source code