Masstransit IReceiveObserver 无法获取消息正文

Masstransit IReceiveObserver can not get message body

我正在尝试从我的服务使用的所有消息中获取特定信息,我不想在我拥有的每个消费者中重复相同的代码。

所以我创建了一个 class 来实现 IReceiveObserver 并在内部 public Task PreReceive(ReceiveContext context) 我试图获取消息正文,但我做不到。

  public Task PreReceive(ReceiveContext context)
        {
            var body = context.GetBody();
            return null;
        }

我假设 body 将包含消息正文,所以我尝试使用以下方法对其进行反序列化:

   public static object DeserializeFromStream(MemoryStream stream)
        {
            IFormatter formatter = new BinaryFormatter();
            stream.Seek(0, SeekOrigin.Begin);
            object o = formatter.Deserialize(stream);
            return o;
        }
var Deserialized =(GeneralMessageType) DeserializeFromStream((MemoryStream)body);

我收到以下错误:

System.Runtime.Serialization.SerializationException: 'The input stream is not a valid binary format.

这是实现我需要的正确方法吗?我认为我采用了正确的方法,但我在提取邮件正文时遇到问题。

更新:

邮件正文使用 Console.WriteLine("{0}", (new StreamReader(stream)).ReadToEnd());:

{\r\n \"messageId\": \"01110000-5d1b-0015-b7c9-08d5479e73b2\",\r\n \"correlationId\": \"da197981-a10e-43a9-a53b-67b7a3261511\",\r\n \"conversationId\": \"01110000-5d1b-0015-80d3-08d5479e73a6\",\r\n \"sourceAddress\": \"queue URL",\r\n \"destinationAddress\": \"receiverURL",\r\n \"messageType\": [\r\n \"urn:message:MessageType"\r\n ],\r\n \"message\": {\r\n \"correlationId\": \"da197981-a10e-43a9-a53b-67b7a3261511\",\r\n \"Id\": \"62d2bfff-7e20-4b51-b431-7d46e44abfc0\",\r\n \"Name\": \"'''yry'''\",\r\n \"Code\": \"ryryyy\",\r\n \"isActive\": true,\r\n \"username\": \"username\",\r\n \"password\": \"*****\",\r\n \"isUserActive\": true\r\n },\r\n \"headers\": {},\r\n \"host\": {\r\n \"machineName\": \"myMachine\",\r\n
\"processName\": \"w3wp\",\r\n \"processId\": 4568,\r\n
\"assembly\": \"MassTransit\",\r\n \"assemblyVersion\": \"3.5.7.1082\",\r\n \"frameworkVersion\": \"4.0.30319.42000\",\r\n \"massTransitVersion\": \"3.5.7.1082\",\r\n
\"operatingSystemVersion\": \"Microsoft Windows NT 10.0.16299.0\"\r\n }\r\n}"

因此您无需为 BinaryFormatter 操心。相反,您应该使用 System.Runtime.Serialization.DataContractJsonSerializer,因为您的流内容以 JSON 的形式出现。您可以使用以下代码执行此操作:

DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(MassTransit));
MassTransit obj = (MassTransit)serializer.ReadObject(stream);

这是假设您尝试反序列化的对象属于 MassTransit 类型。如果不是,则只需将其替换为您要反序列化的实例的任何 class。

您可以将其设为通用方法,如下所示:

public static T JsonDeserializer<T>(MemoryStream strm)
{
    DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
    return (T)serializer.ReadObject(strm);
}

只要您想从 JSON 流中反序列化一个对象,只需调用它即可。

想通了:

正如您在问题中看到的那样 body context.GetBody(); 给出了 json 包含主机信息、Headers 和消息 body 除了一些其他属性。

我所做的是用“删除”\r\n"s and replacing \" 然后继续 到 https://jsonutils.com/,粘贴生成的 Json,它将把它转换成你应该反序列化到的 类。

问题是 body 包含的信息比发送的消息多得多,所有信息都是由 masstransit 和 rabbitmq 添加的。

 public class Message
{
    //Message attributes
}

public class Headers
{
}

public class Host
{
    public string machineName { get; set; }
    public string processName { get; set; }
    public int processId { get; set; }
    public string assembly { get; set; }
    public string assemblyVersion { get; set; }
    public string frameworkVersion { get; set; }
    public string massTransitVersion { get; set; }
    public string operatingSystemVersion { get; set; }
}

public class TotalMessage
{
    public string messageId { get; set; }
    public string correlationId { get; set; }
    public string conversationId { get; set; }
    public string sourceAddress { get; set; }
    public string destinationAddress { get; set; }
    public IList<string> messageType { get; set; }
    public Message message { get; set; }
    public Headers headers { get; set; }
    public Host host { get; set; }
}

反序列化你可以使用:

  public static object DeserializeFromStream(MemoryStream stream)
        {
            IFormatter formatter = new BinaryFormatter();
            stream.Seek(0, SeekOrigin.Begin);
            object o = formatter.Deserialize(stream);
            return o;
        }

 var body = context.GetBody();
 var totalMessage= JsonDeserializer<TotalMessage>((MemoryStream)body);

获取消息body:totalMessage.message

更新:

当然,您可以通过以下方式反序列化为消息 body:

public class TotalMessage
{
    public Message message { get; set; }
}