使用 MassTransit 和 Kafka 配置 Avro
Configure Avro with MassTransit and Kafka
如何使用 Avro 配置 MassTransit serialize/deserialize 在从 Confluent Kafka 主题生成和消费时使用 Avro?我看到 Avro serializer/deserializer 在包 Confluent.SchemaRegistry.Serdes
中。欢迎提供一些代码示例。
要将 MassTransit 配置为使用 Avro,我采用的方法是使用生成的 class 文件 (avrogen
),然后配置生产者和主题端点,如下所示:
首先,您需要为架构注册表创建客户端:
var schemaRegistryClient = new CachedSchemaRegistryClient(new Dictionary<string, string>
{
{"schema.registry.url", "localhost:8081"},
});
然后,您可以配置骑手:[=17=]
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.AddProducer<string, KafkaMessage>(Topic, context => context.MessageId.ToString())
.SetKeySerializer(new AvroSerializer<string>(schemaRegistryClient).AsSyncOverAsync())
.SetValueSerializer(new AvroSerializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<string, KafkaMessage>("topic-name", "consumer-group", c =>
{
c.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistryClient).AsSyncOverAsync());
c.SetValueDeserializer(new AvroDeserializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureConsumer<KafkaMessageConsumer>(context);
c.CreateIfMissing(m =>
{
m.NumPartitions = 2;
});
});
});
});
});
您可以查看 working unit test 以查看更多详细信息。我应该将其添加到文档中。
I wrote this just now to answer this question, I hadn't used Avro until an hour ago.
此外,我使用 this article from Confluent 和 运行 来解决问题。链接单元测试项目中的docker-compose.yml
配置了所有需要的服务。
如何使用 Avro 配置 MassTransit serialize/deserialize 在从 Confluent Kafka 主题生成和消费时使用 Avro?我看到 Avro serializer/deserializer 在包 Confluent.SchemaRegistry.Serdes
中。欢迎提供一些代码示例。
要将 MassTransit 配置为使用 Avro,我采用的方法是使用生成的 class 文件 (avrogen
),然后配置生产者和主题端点,如下所示:
首先,您需要为架构注册表创建客户端:
var schemaRegistryClient = new CachedSchemaRegistryClient(new Dictionary<string, string>
{
{"schema.registry.url", "localhost:8081"},
});
然后,您可以配置骑手:[=17=]
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.AddProducer<string, KafkaMessage>(Topic, context => context.MessageId.ToString())
.SetKeySerializer(new AvroSerializer<string>(schemaRegistryClient).AsSyncOverAsync())
.SetValueSerializer(new AvroSerializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<string, KafkaMessage>("topic-name", "consumer-group", c =>
{
c.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistryClient).AsSyncOverAsync());
c.SetValueDeserializer(new AvroDeserializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureConsumer<KafkaMessageConsumer>(context);
c.CreateIfMissing(m =>
{
m.NumPartitions = 2;
});
});
});
});
});
您可以查看 working unit test 以查看更多详细信息。我应该将其添加到文档中。
I wrote this just now to answer this question, I hadn't used Avro until an hour ago.
此外,我使用 this article from Confluent 和 运行 来解决问题。链接单元测试项目中的docker-compose.yml
配置了所有需要的服务。