在 confluent-dotnet-kafka 中如何在 .Net 的 Message Producer 中传递一个对象

In confluent-dotnet-kafka how to pass an object in Message Producer in .Net

我正在使用 .net core 3.1 并使用融合的kafka lib

using Confluent.Kafka;

我正在实现 kafka 系统,同时创建生产者和消费者。

我知道我可以轻松地执行类似下面代码的操作来创建消息并将消息发送到生产者中的主题:

using (var producer = new ProducerBuilder<long, string>(config).Build()) 
{
 for(var i=0; i<1000; i++)
 {
  var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, string> { Key 
 = i, Value = "lorem ipsum "+i });
 }
}

效果不错。

但是如果我想使用一个对象,我该怎么做呢? 以下无效:

using (var producer = new ProducerBuilder<long, User>(config).Build()) 
{
 for(var i=0; i<1000; i++)
 {
  var user = new User() { Id = i, Name = "random" }; 
  var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, User> { Key 
 = i, Value = user });
 }
}

它缺少什么?我听说有一种方法可以做类似的事情,但找不到。

在那种情况下,您将不得不序列化您的对象。据我所知,JSON 有效载荷超出了您一直使用的库的范围。

因此,您可以使用 Avro(但您需要 Schema Registry)。这是一个 example:

using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Threading.Tasks;


namespace Confluent.Kafka.Examples.AvroGeneric
{
    class Program
    {
        static async Task Main(string[] args)
        {
            if (args.Length != 3)
            {
                Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
                return;
            }

            string bootstrapServers = args[0];
            string schemaRegistryUrl = args[1];
            string topicName = args[2];
            string groupName = "avro-generic-example-group";

            // var s = (RecordSchema)RecordSchema.Parse(File.ReadAllText("my-schema.json"));
            var s = (RecordSchema)RecordSchema.Parse(
                @"{
                    ""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
                    ""type"": ""record"",
                    ""name"": ""User"",
                    ""fields"": [
                        {""name"": ""name"", ""type"": ""string""},
                        {""name"": ""favorite_number"",  ""type"": [""int"", ""null""]},
                        {""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
                    ]
                  }"
            );

            CancellationTokenSource cts = new CancellationTokenSource();
            var consumeTask = Task.Run(() =>
            {
                using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
                using (var consumer =
                    new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
                        .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                        .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                        .Build())
                {
                    consumer.Subscribe(topicName);

                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var consumeResult = consumer.Consume(cts.Token);

                                Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Message.Value}");
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Consume error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        // commit final offsets and leave the group.
                        consumer.Close();
                    }
                }
            });

            using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
            using (var producer =
                new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
                    .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                    .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
                    .Build())
            {
                Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

                int i = 0;
                string text;
                while ((text = Console.ReadLine()) != "q")
                {
                    var record = new GenericRecord(s);
                    record.Add("name", text);
                    record.Add("favorite_number", i++);
                    record.Add("favorite_color", "blue");

                    await producer
                        .ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                        .ContinueWith(task => task.IsFaulted
                            ? $"error producing message: {task.Exception.Message}"
                            : $"produced to: {task.Result.TopicPartitionOffset}");
                }
            }

            cts.Cancel();
        }
    }
}

Kafka 接受任何客户端中的字节,而不是 "objects"。您将需要实现自己的 serializer,或使用 Confluent Schema Registry 支持的那些。

JSON 示例 - https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/JsonSerialization

Avro 示例 - https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific

Protobuf 示例 - https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Protobuf