MassTransit 与 Kafka 和 NodaTime

MassTransit with Kafka and NodaTime

我在 C# 代码中有以下消息:

public interface ResourcePerformance
{
    public string ResourceId { get; }
    public List<TimeSection> TimeSections { get; }
}
public class TimeSection
{
    public Instant PeriodStart { get; set; }
    public Instant PeriodEnd { get; set; }
    public PerformanceStatus PerformanceStatus { get; set; }
    public Duration ProcessingTime { get; set; }
    public double Quantity { get; set; }
}

我想反序列化来自 Kafka 主题的此类消息。但是,从 NodaTime 库反序列化类型时会出现错误,例如:

Confluent.Kafka.ConsumeException: Local: Value deserialization error
 ---> Newtonsoft.Json.JsonSerializationException: Error converting value "2:00:00" to type 'NodaTime.Duration'. Path 'timeSections[0].processingTime', line 1, position 330.

我想这一定是 NodaTime 序列化的问题,因为当我将 NodaTime 类型更改为对象时,没有报告任何错误。我已经在配置的 RabbitMQ 部分为 json 序列化程序配置了 NodaTime,但我不知道如何在 Kafka 部分进行配置。目前我有以下配置:

services.AddMassTransit(x =>
        {
            x.SetKebabCaseEndpointNameFormatter();

            x.UsingRabbitMq((context, cfg) =>
            {

                cfg.ConfigureJsonSerializer( j => j.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb) );
                cfg.ConfigureEndpoints(context);
            });
            
            x.AddRider(rider =>
            {                                     
                rider.AddConsumer<ResourcePerformanceConsumer>();

                rider.UsingKafka((context, k) =>
                {
                                            
                    k.TopicEndpoint<string, ResourcePerformance>("performances-resource", kafkaConsumerGroup , e =>
                    {
                        e.AutoOffsetReset = AutoOffsetReset.Earliest;
                        e.CreateIfMissing(t =>
                        {
                            t.NumPartitions = 4; //number of partitions
                            t.ReplicationFactor = 1; //number of replicas
                        });
                        e.ConfigureConsumer<ResourcePerformanceConsumer>(context);
                    });
                });
            });
        });

        services.AddMassTransitHostedService();

我该怎么做才能正确反序列化它?

我在 MassTransit discord 上找到了答案。除了配置序列化器,还需要配置反序列化器:

cfg.ConfigureJsonDeserializer(j => j.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb) );