c# confluent.kafka 无法使用 Protobuf-net 反序列化 protobuf 消息

c# confluent.kafka unable to deserialize protobuf message using Protobuf-net

继续我之前的问题 C# Confluent.Kafka SetValueDeserializer object deserialization,我尝试创建自定义反序列化器来反序列化 protobuf 消息,但出现此错误:

System.InvalidOperationException: 'Type is not expected, and no contract can be inferred: Ileco.Chimp.Proto.FinalValue'

在线:

return Serializer.Deserialize<T>(stream);

这是我的消费者和反序列化器:

    class Worker
    {
        public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
                SessionTimeoutMs = 30000,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "$ConnectionString",
                SaslPassword = connStr,
                SslCaLocation = cacertlocation,
                GroupId = consumergroup,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
                //Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information
            };

            using (var consumer = new ConsumerBuilder<string, FinalValue>(config)
                .SetKeyDeserializer(Deserializers.Utf8)
                .SetValueDeserializer(new MyCustomDeserializer<FinalValue>())
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                .Build())
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };

                consumer.Subscribe(topic);

                Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);

                while (true)
                {
                    try
                    {
                        var msg = consumer.Consume(cts.Token);
                        Console.WriteLine($"Received: '{msg.Message.Value}'");

                        //var bytes = Encoding.ASCII.GetBytes(msg.Message.Value);
                        //var fv = FromByteArray<ProtobufMsg>(bytes);

                        //var proto = ProtoDeserialize<ProtobufMsg>(bytes);
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Error: {e.Message}");
                    }
                }
            }
        }


    }

    public class MyCustomDeserializer<T> : IDeserializer<T>
    {
        public T Deserialize(ReadOnlySpan<byte> data, bool isNull, Confluent.Kafka.SerializationContext context)
        {
            using (var stream = new MemoryStream(data.ToArray()))
            {
                return Serializer.Deserialize<T>(stream);
            }
        }
    }

FinalValue.proto

syntax = "proto3";

package ileco.chimp.proto;

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";

message FinalValue {
  google.protobuf.Timestamp timestamp = 1;
  uint32 inputId = 2;
  google.protobuf.DoubleValue value = 3;
  uint32 sourceId = 4;
  string inputGuid = 5;
}

FinalValue.cs

// Generated by the protocol buffer compiler.  DO NOT EDIT!
// source: proto/FinalValue.proto
#pragma warning disable 1591, 0612, 3021
#region Designer generated code

using pb = global::Google.Protobuf;
using pbc = global::Google.Protobuf.Collections;
using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
namespace Ileco.Chimp.Proto {

  /// <summary>Holder for reflection information generated from proto/FinalValue.proto</summary>
  [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
  public static partial class FinalValueReflection {

    #region Descriptor
    /// <summary>File descriptor for proto/FinalValue.proto</summary>
    public static pbr::FileDescriptor Descriptor {
      get { return descriptor; }
    }
    private static pbr::FileDescriptor descriptor;

    static FinalValueReflection() {
      byte[] descriptorData = global::System.Convert.FromBase64String(
          string.Concat(
            "ChZwcm90by9GaW5hbFZhbHVlLnByb3RvEhFpbGVjby5jaGltcC5wcm90bxof",
            "Z29vZ2xlL3Byb3RvYnVmL3RpbWVzdGFtcC5wcm90bxoeZ29vZ2xlL3Byb3Rv",
            "YnVmL3dyYXBwZXJzLnByb3RvIp4BCgpGaW5hbFZhbHVlEi0KCXRpbWVzdGFt",
            "cBgBIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1lc3RhbXASDwoHaW5wdXRJ",
            "ZBgCIAEoDRIrCgV2YWx1ZRgDIAEoCzIcLmdvb2dsZS5wcm90b2J1Zi5Eb3Vi",
            "bGVWYWx1ZRIQCghzb3VyY2VJZBgEIAEoDRIRCglpbnB1dEd1aWQYBSABKAlC",
            "JQoRaWxlY28uY2hpbXAucHJvdG9CEEZpbmFsVmFsdWVQcm90b3NiBnByb3Rv",
            "Mw=="));
      descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
          new pbr::FileDescriptor[] { global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, global::Google.Protobuf.WellKnownTypes.WrappersReflection.Descriptor, },
          new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
            new pbr::GeneratedClrTypeInfo(typeof(global::Ileco.Chimp.Proto.FinalValue), global::Ileco.Chimp.Proto.FinalValue.Parser, new[]{ "Timestamp", "InputId", "Value", "SourceId", "InputGuid" }, null, null, null)
          }));
    }
    #endregion

  }
  #region Messages
  [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
  public sealed partial class FinalValue : pb::IMessage<FinalValue> {
    private static readonly pb::MessageParser<FinalValue> _parser = new pb::MessageParser<FinalValue>(() => new FinalValue());
    public static pb::MessageParser<FinalValue> Parser { get { return _parser; } }

    public static pbr::MessageDescriptor Descriptor {
      get { return global::Ileco.Chimp.Proto.FinalValueReflection.Descriptor.MessageTypes[0]; }
    }

    pbr::MessageDescriptor pb::IMessage.Descriptor {
      get { return Descriptor; }
    }

    public FinalValue() {
      OnConstruction();
    }

    partial void OnConstruction();

    public FinalValue(FinalValue other) : this() {
      Timestamp = other.timestamp_ != null ? other.Timestamp.Clone() : null;
      inputId_ = other.inputId_;
      Value = other.Value;
      sourceId_ = other.sourceId_;
      inputGuid_ = other.inputGuid_;
    }

    public FinalValue Clone() {
      return new FinalValue(this);
    }

    /// <summary>Field number for the "timestamp" field.</summary>
    public const int TimestampFieldNumber = 1;
    private global::Google.Protobuf.WellKnownTypes.Timestamp timestamp_;
    public global::Google.Protobuf.WellKnownTypes.Timestamp Timestamp {
      get { return timestamp_; }
      set {
        timestamp_ = value;
      }
    }

    /// <summary>Field number for the "inputId" field.</summary>
    public const int InputIdFieldNumber = 2;
    private uint inputId_;
    public uint InputId {
      get { return inputId_; }
      set {
        inputId_ = value;
      }
    }

    /// <summary>Field number for the "value" field.</summary>
    public const int ValueFieldNumber = 3;
    private static readonly pb::FieldCodec<double?> _single_value_codec = pb::FieldCodec.ForStructWrapper<double>(26);
    private double? value_;
    public double? Value {
      get { return value_; }
      set {
        value_ = value;
      }
    }

    /// <summary>Field number for the "sourceId" field.</summary>
    public const int SourceIdFieldNumber = 4;
    private uint sourceId_;
    public uint SourceId {
      get { return sourceId_; }
      set {
        sourceId_ = value;
      }
    }

    /// <summary>Field number for the "inputGuid" field.</summary>
    public const int InputGuidFieldNumber = 5;
    private string inputGuid_ = "";
    public string InputGuid {
      get { return inputGuid_; }
      set {
        inputGuid_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
      }
    }

    public override bool Equals(object other) {
      return Equals(other as FinalValue);
    }

    public bool Equals(FinalValue other) {
      if (ReferenceEquals(other, null)) {
        return false;
      }
      if (ReferenceEquals(other, this)) {
        return true;
      }
      if (!object.Equals(Timestamp, other.Timestamp)) return false;
      if (InputId != other.InputId) return false;
      if (Value != other.Value) return false;
      if (SourceId != other.SourceId) return false;
      if (InputGuid != other.InputGuid) return false;
      return true;
    }

    public override int GetHashCode() {
      int hash = 1;
      if (timestamp_ != null) hash ^= Timestamp.GetHashCode();
      if (InputId != 0) hash ^= InputId.GetHashCode();
      if (value_ != null) hash ^= Value.GetHashCode();
      if (SourceId != 0) hash ^= SourceId.GetHashCode();
      if (InputGuid.Length != 0) hash ^= InputGuid.GetHashCode();
      return hash;
    }

    public override string ToString() {
      return pb::JsonFormatter.ToDiagnosticString(this);
    }

    public void WriteTo(pb::CodedOutputStream output) {
      if (timestamp_ != null) {
        output.WriteRawTag(10);
        output.WriteMessage(Timestamp);
      }
      if (InputId != 0) {
        output.WriteRawTag(16);
        output.WriteUInt32(InputId);
      }
      if (value_ != null) {
        _single_value_codec.WriteTagAndValue(output, Value);
      }
      if (SourceId != 0) {
        output.WriteRawTag(32);
        output.WriteUInt32(SourceId);
      }
      if (InputGuid.Length != 0) {
        output.WriteRawTag(42);
        output.WriteString(InputGuid);
      }
    }

    public int CalculateSize() {
      int size = 0;
      if (timestamp_ != null) {
        size += 1 + pb::CodedOutputStream.ComputeMessageSize(Timestamp);
      }
      if (InputId != 0) {
        size += 1 + pb::CodedOutputStream.ComputeUInt32Size(InputId);
      }
      if (value_ != null) {
        size += _single_value_codec.CalculateSizeWithTag(Value);
      }
      if (SourceId != 0) {
        size += 1 + pb::CodedOutputStream.ComputeUInt32Size(SourceId);
      }
      if (InputGuid.Length != 0) {
        size += 1 + pb::CodedOutputStream.ComputeStringSize(InputGuid);
      }
      return size;
    }

    public void MergeFrom(FinalValue other) {
      if (other == null) {
        return;
      }
      if (other.timestamp_ != null) {
        if (timestamp_ == null) {
          timestamp_ = new global::Google.Protobuf.WellKnownTypes.Timestamp();
        }
        Timestamp.MergeFrom(other.Timestamp);
      }
      if (other.InputId != 0) {
        InputId = other.InputId;
      }
      if (other.value_ != null) {
        if (value_ == null || other.Value != 0D) {
          Value = other.Value;
        }
      }
      if (other.SourceId != 0) {
        SourceId = other.SourceId;
      }
      if (other.InputGuid.Length != 0) {
        InputGuid = other.InputGuid;
      }
    }

    public void MergeFrom(pb::CodedInputStream input) {
      uint tag;
      while ((tag = input.ReadTag()) != 0) {
        switch(tag) {
          default:
            input.SkipLastField();
            break;
          case 10: {
            if (timestamp_ == null) {
              timestamp_ = new global::Google.Protobuf.WellKnownTypes.Timestamp();
            }
            input.ReadMessage(timestamp_);
            break;
          }
          case 16: {
            InputId = input.ReadUInt32();
            break;
          }
          case 26: {
            double? value = _single_value_codec.Read(input);
            if (value_ == null || value != 0D) {
              Value = value;
            }
            break;
          }
          case 32: {
            SourceId = input.ReadUInt32();
            break;
          }
          case 42: {
            InputGuid = input.ReadString();
            break;
          }
        }
      }
    }

  }

  #endregion

}

#endregion Designer generated code

因为我 noted yesterday, you appear to have used the Google .proto processing tools (protoc), but are using protobuf-net; if you want to use protobuf-net, similar command-line/IDE/build/etc tools exist that are compatible with the protobuf-net library, or you can use https://protogen.marcgravell.com/ 用于临时使用(以避免必须安装任何东西)。或者:继续使用 Google 架构工具,但使用 Google 库。基本上:他们需要匹配。

这里唯一的小问题是 protobuf-net 目前没有对 DoubleValue 的明确内置支持;供参考:这可以简单地认为:

namespace Google.Protobuf.WellKnownTypes
{
    [ProtoContract]
    public sealed class DoubleValue
    {
        [ProtoMember(1)]
        public double Value {get;set;}
    }
}

我应该 可能 找时间从 wrappers.proto 中获取所有类型并允许它们作为 double?float?long? 等 - 但它需要一个额外的标记,因为 Nullable<T> 已经处理但具有不同的含义(即 optional 在 .proto 术语中)