Kafka c# consumer json 响应反序列化问题
Kafka c# consumer json response deserialization issue
我已经按照本指南设置了 Postgres 连接器:
这很好用,我可以在 kafkacat 中看到正确的 json。
我已经在 c# 中设置了一个消费者:
using Newtonsoft.Json.Linq;
static async Task<ClientConfig> LoadConfig(string configPath)
{
try
{
var cloudConfig =
new string[] { "bootstrap.servers=localhost:9092", "session.timeout.ms=45000" }
.Where(line => !line.StartsWith("#"))
.ToDictionary(
line => line.Substring(0, line.IndexOf('=')),
line => line.Substring(line.IndexOf('=') + 1));
var clientConfig = new ClientConfig(cloudConfig);
return clientConfig;
}
catch (Exception e)
{
Console.WriteLine($"An error occured reading the config file from '{configPath}': {e.Message}");
System.Environment.Exit(1);
return null; // avoid not-all-paths-return-value compiler error.
}
}
static void Consume(string topic, ClientConfig config)
{
var consumerConfig = new ConsumerConfig(config);
consumerConfig.GroupId = "dotnet-example-group-1";
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
consumerConfig.EnableAutoCommit = false;
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(topic);
var totalCount = 0;
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
var json = cr.Message.Value;
Console.WriteLine(cr.Message.Value);
//totalCount += JObject.Parse(cr.Message.Value).Value<int>("count");
//Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value}, and updated total count to {totalCount}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
}
}
var topic = "postgres.public.shipments";
var configPath = "librdkafka.config";
var certDir = args.Length == 4 ? args[3] : null;
var config = await LoadConfig(configPath);
Consume(topic, config);
这也有效,但我总是在 cr.Message.Value
中得到一种奇怪的格式,应该是 JSON:
"[=27=][=27=][=27=][=27=]\u0002[=27=]\u0002��\u0003��\u0001\u0002\u00142021-01-21\u0002\u0012COMPLETED\u00161.4.2.Final\u0014postgresql\u0010postgres��ا��[=13=]\btrue\u0016shipment_db\fpublic\u0012shipments\u0002�\a\u0002���\u0016[=13=]\u0002r\u0002��ا�
\0"
配置错误在哪里?
链接的博客使用 Avro,而不是 JSON。 kcat 会将 Avro 数据反序列化为 show JSON,如 post -s value=avro
.
中所写
如果数据是 Avro,则错误开始于您的配置 variable/file 并且 ConsumerBuilder
值不应是字符串,而是生成的 Avro 类型
如果你想要 JSON,错误配置从 Debezium 的转换器设置开始;在 Docker Compose 中,您需要这些
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
相关 - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
我已经按照本指南设置了 Postgres 连接器:
这很好用,我可以在 kafkacat 中看到正确的 json。
我已经在 c# 中设置了一个消费者:
using Newtonsoft.Json.Linq;
static async Task<ClientConfig> LoadConfig(string configPath)
{
try
{
var cloudConfig =
new string[] { "bootstrap.servers=localhost:9092", "session.timeout.ms=45000" }
.Where(line => !line.StartsWith("#"))
.ToDictionary(
line => line.Substring(0, line.IndexOf('=')),
line => line.Substring(line.IndexOf('=') + 1));
var clientConfig = new ClientConfig(cloudConfig);
return clientConfig;
}
catch (Exception e)
{
Console.WriteLine($"An error occured reading the config file from '{configPath}': {e.Message}");
System.Environment.Exit(1);
return null; // avoid not-all-paths-return-value compiler error.
}
}
static void Consume(string topic, ClientConfig config)
{
var consumerConfig = new ConsumerConfig(config);
consumerConfig.GroupId = "dotnet-example-group-1";
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
consumerConfig.EnableAutoCommit = false;
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(topic);
var totalCount = 0;
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
var json = cr.Message.Value;
Console.WriteLine(cr.Message.Value);
//totalCount += JObject.Parse(cr.Message.Value).Value<int>("count");
//Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value}, and updated total count to {totalCount}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
}
}
var topic = "postgres.public.shipments";
var configPath = "librdkafka.config";
var certDir = args.Length == 4 ? args[3] : null;
var config = await LoadConfig(configPath);
Consume(topic, config);
这也有效,但我总是在 cr.Message.Value
中得到一种奇怪的格式,应该是 JSON:
"[=27=][=27=][=27=][=27=]\u0002[=27=]\u0002��\u0003��\u0001\u0002\u00142021-01-21\u0002\u0012COMPLETED\u00161.4.2.Final\u0014postgresql\u0010postgres��ا��[=13=]\btrue\u0016shipment_db\fpublic\u0012shipments\u0002�\a\u0002���\u0016[=13=]\u0002r\u0002��ا�
\0"
配置错误在哪里?
链接的博客使用 Avro,而不是 JSON。 kcat 会将 Avro 数据反序列化为 show JSON,如 post -s value=avro
.
如果数据是 Avro,则错误开始于您的配置 variable/file 并且 ConsumerBuilder
值不应是字符串,而是生成的 Avro 类型
如果你想要 JSON,错误配置从 Debezium 的转换器设置开始;在 Docker Compose 中,您需要这些
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
相关 - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/