avro 序列化的 C# 融合 kafka 问题
C# confluent kafka problem with avro serialization
我正在使用 docker 到 运行 kafka 和来自 https://github.com/confluentinc/cp-all-in-one 的其他服务
在我的测试项目中使用 kafka、avro 和 schemaRegistry 的融合 nuget 包。
如果要发送 json 消息我到现在为止没有问题,但我正在努力发送 avro 序列化消息。
我看到了 https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific 示例,我尝试以相同的方式进行操作,但最终我得到如下异常:
Local: Value serialization error
at Confluent.Kafka.Producer2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult()
at Kafka_producer.KafkaService.d__10.MoveNext() in
C:\Users\lu95eb\source\repos\Kafka_playground\Kafka producer\KafkaService.cs:line 126
有内部异常
Object reference not set to an instance of an object.
at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer
1.d__6.MoveNext()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Confluent.Kafka.Producer`2.d__52.MoveNext()
这是我的 SpecificRecord class
public class UserInfo : ISpecificRecord
{
public string Name { get; set; }
public int[] Numbers { get; set; }
public Schema Schema => Schema.Parse(@"
{
""name"": ""UserInfo"",
""type"": ""record"",
""namespace"": ""kafka"",
""fields"": [
{
""name"": ""Name"",
""type"": ""string""
},
{
""name"": ""Numbers"",
""type"": {
""type"": ""array"",
""items"": ""int""
}
}
]
}
");
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return Name;
case 1: return Numbers;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
}
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: Name = (string)fieldValue; break;
case 1: Numbers = (int[])fieldValue; break;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
}
}
}
以及用于发送消息的方法
private async Task SendSpecificRecord(UserInfo userInfo)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
.SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
.SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
.Build())
{
var message = new Message<string, UserInfo>
{
Key = userInfo.Name,
Value = userInfo
};
await producer.ProduceAsync(SpecificTopic, message);
}
}
KafkaService.cs:第 126 行是 await producer.ProduceAsync(SpecificTopic, message);
就像我在开始时写的那样,我对 schemaRegistry 没有任何问题-我注册了模式并且它们可以正常工作 json,我对主题、代理、消费者或其他方面没有问题。
如果有人能指出我做错了什么,我将不胜感激。
提前谢谢你。
如果有人对解决方案感到好奇(我无法想象有人会怎样 ;))
然后我写了 'custom' avro 序列化器和反序列化器,并且工作起来很有魅力。
public class CustomAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
public Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
return Task.Run(() =>
{
using (var ms = new MemoryStream())
{
var enc = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(data.Schema);
writer.Write(data, enc);
return ms.ToArray();
}
});
}
}
public class CustomAvroDeserializer<T> : IDeserializer<T>
where T : class, ISpecificRecord
{
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
using (var ms = new MemoryStream(data.ToArray()))
{
var dec = new BinaryDecoder(ms);
var regenObj = (T)Activator.CreateInstance(typeof(T));
var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
reader.Read(regenObj, dec);
return regenObj;
}
}
}
我遇到了同样的问题,在查看 github 上的库代码后能够解决它。
似乎架构注册表需要在您的 class 实现名为 _SCHEMA 的 ISpecificRecord 中的静态字段。
所以如果你添加一个
public 静态 _SCHEMA = Schema.Parse(....
并更改您的 public 架构 => UserInfo._SCHEMA;
它会在没有您的解决方法的情况下工作,它只是忽略模式注册表。
我正在使用 docker 到 运行 kafka 和来自 https://github.com/confluentinc/cp-all-in-one 的其他服务 在我的测试项目中使用 kafka、avro 和 schemaRegistry 的融合 nuget 包。
如果要发送 json 消息我到现在为止没有问题,但我正在努力发送 avro 序列化消息。
我看到了 https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific 示例,我尝试以相同的方式进行操作,但最终我得到如下异常:
Local: Value serialization error
at Confluent.Kafka.Producer2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult() at Kafka_producer.KafkaService.d__10.MoveNext() in C:\Users\lu95eb\source\repos\Kafka_playground\Kafka producer\KafkaService.cs:line 126
有内部异常
Object reference not set to an instance of an object.
at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer
1.d__6.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Confluent.Kafka.Producer`2.d__52.MoveNext()
这是我的 SpecificRecord class
public class UserInfo : ISpecificRecord
{
public string Name { get; set; }
public int[] Numbers { get; set; }
public Schema Schema => Schema.Parse(@"
{
""name"": ""UserInfo"",
""type"": ""record"",
""namespace"": ""kafka"",
""fields"": [
{
""name"": ""Name"",
""type"": ""string""
},
{
""name"": ""Numbers"",
""type"": {
""type"": ""array"",
""items"": ""int""
}
}
]
}
");
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return Name;
case 1: return Numbers;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
}
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: Name = (string)fieldValue; break;
case 1: Numbers = (int[])fieldValue; break;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
}
}
}
以及用于发送消息的方法
private async Task SendSpecificRecord(UserInfo userInfo)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
.SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
.SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
.Build())
{
var message = new Message<string, UserInfo>
{
Key = userInfo.Name,
Value = userInfo
};
await producer.ProduceAsync(SpecificTopic, message);
}
}
KafkaService.cs:第 126 行是 await producer.ProduceAsync(SpecificTopic, message);
就像我在开始时写的那样,我对 schemaRegistry 没有任何问题-我注册了模式并且它们可以正常工作 json,我对主题、代理、消费者或其他方面没有问题。
如果有人能指出我做错了什么,我将不胜感激。 提前谢谢你。
如果有人对解决方案感到好奇(我无法想象有人会怎样 ;)) 然后我写了 'custom' avro 序列化器和反序列化器,并且工作起来很有魅力。
public class CustomAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
public Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
return Task.Run(() =>
{
using (var ms = new MemoryStream())
{
var enc = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(data.Schema);
writer.Write(data, enc);
return ms.ToArray();
}
});
}
}
public class CustomAvroDeserializer<T> : IDeserializer<T>
where T : class, ISpecificRecord
{
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
using (var ms = new MemoryStream(data.ToArray()))
{
var dec = new BinaryDecoder(ms);
var regenObj = (T)Activator.CreateInstance(typeof(T));
var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
reader.Read(regenObj, dec);
return regenObj;
}
}
}
我遇到了同样的问题,在查看 github 上的库代码后能够解决它。 似乎架构注册表需要在您的 class 实现名为 _SCHEMA 的 ISpecificRecord 中的静态字段。
所以如果你添加一个 public 静态 _SCHEMA = Schema.Parse(....
并更改您的 public 架构 => UserInfo._SCHEMA;
它会在没有您的解决方法的情况下工作,它只是忽略模式注册表。