Kafka c# consumer json 响应反序列化问题

Kafka c# consumer json response deserialization issue

我已经按照本指南设置了 Postgres 连接器:

https://medium.com/event-driven-utopia/configuring-debezium-to-capture-postgresql-changes-with-docker-compose-224742ca5372

这很好用,我可以在 kafkacat 中看到正确的 json。

我已经在 c# 中设置了一个消费者:

using Newtonsoft.Json.Linq;

static async Task<ClientConfig> LoadConfig(string configPath)
{
    try
    {
        var cloudConfig =
           new string[] { "bootstrap.servers=localhost:9092", "session.timeout.ms=45000" }
            .Where(line => !line.StartsWith("#"))
            .ToDictionary(
                line => line.Substring(0, line.IndexOf('=')),
                line => line.Substring(line.IndexOf('=') + 1));

        var clientConfig = new ClientConfig(cloudConfig);

        return clientConfig;
    }
    catch (Exception e)
    {
        Console.WriteLine($"An error occured reading the config file from '{configPath}': {e.Message}");
        System.Environment.Exit(1);
        return null; // avoid not-all-paths-return-value compiler error.
    }
}


static void Consume(string topic, ClientConfig config)
{
    var consumerConfig = new ConsumerConfig(config);
    consumerConfig.GroupId = "dotnet-example-group-1";
    consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
    consumerConfig.EnableAutoCommit = false;


    CancellationTokenSource cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true; // prevent the process from terminating.
        cts.Cancel();
    };



    using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
    {
        consumer.Subscribe(topic);
        var totalCount = 0;
        try
        {
            while (true)
            {
                var cr = consumer.Consume(cts.Token);
                var json = cr.Message.Value;
                Console.WriteLine(cr.Message.Value);
                //totalCount += JObject.Parse(cr.Message.Value).Value<int>("count");
                //Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value}, and updated total count to {totalCount}");
            }
        }
        catch (OperationCanceledException)
        {
            // Ctrl-C was pressed.
        }
        finally
        {
            consumer.Close();
        }
    }
}

var topic = "postgres.public.shipments";
var configPath = "librdkafka.config";
var certDir = args.Length == 4 ? args[3] : null;

var config = await LoadConfig(configPath);

Consume(topic, config);

这也有效,但我总是在 cr.Message.Value 中得到一种奇怪的格式,应该是 JSON: "[=27=][=27=][=27=][=27=]\u0002[=27=]\u0002��\u0003��\u0001\u0002\u00142021-01-21\u0002\u0012COMPLETED\u00161.4.2.Final\u0014postgresql\u0010postgres��ا��[=13=]\btrue\u0016shipment_db\fpublic\u0012shipments\u0002�\a\u0002���\u0016[=13=]\u0002r\u0002��ا�\0"

配置错误在哪里?

链接的博客使用 Avro,而不是 JSON。 kcat 会将 Avro 数据反序列化为 show JSON,如 post -s value=avro.

中所写

如果数据是 Avro,则错误开始于您的配置 variable/file 并且 ConsumerBuilder 值不应是字符串,而是生成的 Avro 类型

如果你想要 JSON,错误配置从 Debezium 的转换器设置开始;在 Docker Compose 中,您需要这些

KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter 
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

相关 - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/