Masstransit IReceiveObserver 无法获取消息正文
Masstransit IReceiveObserver can not get message body
我正在尝试从我的服务使用的所有消息中获取特定信息,我不想在我拥有的每个消费者中重复相同的代码。
所以我创建了一个 class 来实现 IReceiveObserver 并在内部 public Task PreReceive(ReceiveContext context)
我试图获取消息正文,但我做不到。
public Task PreReceive(ReceiveContext context)
{
var body = context.GetBody();
return null;
}
我假设 body 将包含消息正文,所以我尝试使用以下方法对其进行反序列化:
public static object DeserializeFromStream(MemoryStream stream)
{
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return o;
}
var Deserialized =(GeneralMessageType) DeserializeFromStream((MemoryStream)body);
我收到以下错误:
System.Runtime.Serialization.SerializationException: 'The input stream
is not a valid binary format.
这是实现我需要的正确方法吗?我认为我采用了正确的方法,但我在提取邮件正文时遇到问题。
更新:
邮件正文使用 Console.WriteLine("{0}", (new StreamReader(stream)).ReadToEnd());
:
{\r\n \"messageId\":
\"01110000-5d1b-0015-b7c9-08d5479e73b2\",\r\n \"correlationId\":
\"da197981-a10e-43a9-a53b-67b7a3261511\",\r\n \"conversationId\":
\"01110000-5d1b-0015-80d3-08d5479e73a6\",\r\n \"sourceAddress\":
\"queue URL",\r\n \"destinationAddress\": \"receiverURL",\r\n
\"messageType\": [\r\n \"urn:message:MessageType"\r\n ],\r\n
\"message\": {\r\n \"correlationId\":
\"da197981-a10e-43a9-a53b-67b7a3261511\",\r\n \"Id\":
\"62d2bfff-7e20-4b51-b431-7d46e44abfc0\",\r\n \"Name\":
\"'''yry'''\",\r\n \"Code\": \"ryryyy\",\r\n \"isActive\":
true,\r\n \"username\": \"username\",\r\n \"password\":
\"*****\",\r\n \"isUserActive\": true\r\n },\r\n \"headers\":
{},\r\n \"host\": {\r\n \"machineName\": \"myMachine\",\r\n
\"processName\": \"w3wp\",\r\n \"processId\": 4568,\r\n
\"assembly\": \"MassTransit\",\r\n \"assemblyVersion\":
\"3.5.7.1082\",\r\n \"frameworkVersion\": \"4.0.30319.42000\",\r\n
\"massTransitVersion\": \"3.5.7.1082\",\r\n
\"operatingSystemVersion\": \"Microsoft Windows NT 10.0.16299.0\"\r\n
}\r\n}"
因此您无需为 BinaryFormatter
操心。相反,您应该使用 System.Runtime.Serialization.DataContractJsonSerializer
,因为您的流内容以 JSON 的形式出现。您可以使用以下代码执行此操作:
DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(MassTransit));
MassTransit obj = (MassTransit)serializer.ReadObject(stream);
这是假设您尝试反序列化的对象属于 MassTransit
类型。如果不是,则只需将其替换为您要反序列化的实例的任何 class。
您可以将其设为通用方法,如下所示:
public static T JsonDeserializer<T>(MemoryStream strm)
{
DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
return (T)serializer.ReadObject(strm);
}
只要您想从 JSON 流中反序列化一个对象,只需调用它即可。
想通了:
正如您在问题中看到的那样 body context.GetBody();
给出了 json 包含主机信息、Headers 和消息 body 除了一些其他属性。
我所做的是用“删除”\r\n"s and replacing \" 然后继续
到 https://jsonutils.com/,粘贴生成的 Json,它将把它转换成你应该反序列化到的 类。
问题是 body 包含的信息比发送的消息多得多,所有信息都是由 masstransit 和 rabbitmq 添加的。
public class Message
{
//Message attributes
}
public class Headers
{
}
public class Host
{
public string machineName { get; set; }
public string processName { get; set; }
public int processId { get; set; }
public string assembly { get; set; }
public string assemblyVersion { get; set; }
public string frameworkVersion { get; set; }
public string massTransitVersion { get; set; }
public string operatingSystemVersion { get; set; }
}
public class TotalMessage
{
public string messageId { get; set; }
public string correlationId { get; set; }
public string conversationId { get; set; }
public string sourceAddress { get; set; }
public string destinationAddress { get; set; }
public IList<string> messageType { get; set; }
public Message message { get; set; }
public Headers headers { get; set; }
public Host host { get; set; }
}
反序列化你可以使用:
public static object DeserializeFromStream(MemoryStream stream)
{
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return o;
}
var body = context.GetBody();
var totalMessage= JsonDeserializer<TotalMessage>((MemoryStream)body);
获取消息body:totalMessage.message
更新:
当然,您可以通过以下方式反序列化为消息 body:
public class TotalMessage
{
public Message message { get; set; }
}
我正在尝试从我的服务使用的所有消息中获取特定信息,我不想在我拥有的每个消费者中重复相同的代码。
所以我创建了一个 class 来实现 IReceiveObserver 并在内部 public Task PreReceive(ReceiveContext context)
我试图获取消息正文,但我做不到。
public Task PreReceive(ReceiveContext context)
{
var body = context.GetBody();
return null;
}
我假设 body 将包含消息正文,所以我尝试使用以下方法对其进行反序列化:
public static object DeserializeFromStream(MemoryStream stream)
{
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return o;
}
var Deserialized =(GeneralMessageType) DeserializeFromStream((MemoryStream)body);
我收到以下错误:
System.Runtime.Serialization.SerializationException: 'The input stream is not a valid binary format.
这是实现我需要的正确方法吗?我认为我采用了正确的方法,但我在提取邮件正文时遇到问题。
更新:
邮件正文使用 Console.WriteLine("{0}", (new StreamReader(stream)).ReadToEnd());
:
{\r\n \"messageId\": \"01110000-5d1b-0015-b7c9-08d5479e73b2\",\r\n \"correlationId\": \"da197981-a10e-43a9-a53b-67b7a3261511\",\r\n \"conversationId\": \"01110000-5d1b-0015-80d3-08d5479e73a6\",\r\n \"sourceAddress\": \"queue URL",\r\n \"destinationAddress\": \"receiverURL",\r\n \"messageType\": [\r\n \"urn:message:MessageType"\r\n ],\r\n \"message\": {\r\n \"correlationId\": \"da197981-a10e-43a9-a53b-67b7a3261511\",\r\n \"Id\": \"62d2bfff-7e20-4b51-b431-7d46e44abfc0\",\r\n \"Name\": \"'''yry'''\",\r\n \"Code\": \"ryryyy\",\r\n \"isActive\": true,\r\n \"username\": \"username\",\r\n \"password\": \"*****\",\r\n \"isUserActive\": true\r\n },\r\n \"headers\": {},\r\n \"host\": {\r\n \"machineName\": \"myMachine\",\r\n
\"processName\": \"w3wp\",\r\n \"processId\": 4568,\r\n
\"assembly\": \"MassTransit\",\r\n \"assemblyVersion\": \"3.5.7.1082\",\r\n \"frameworkVersion\": \"4.0.30319.42000\",\r\n \"massTransitVersion\": \"3.5.7.1082\",\r\n
\"operatingSystemVersion\": \"Microsoft Windows NT 10.0.16299.0\"\r\n }\r\n}"
因此您无需为 BinaryFormatter
操心。相反,您应该使用 System.Runtime.Serialization.DataContractJsonSerializer
,因为您的流内容以 JSON 的形式出现。您可以使用以下代码执行此操作:
DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(MassTransit));
MassTransit obj = (MassTransit)serializer.ReadObject(stream);
这是假设您尝试反序列化的对象属于 MassTransit
类型。如果不是,则只需将其替换为您要反序列化的实例的任何 class。
您可以将其设为通用方法,如下所示:
public static T JsonDeserializer<T>(MemoryStream strm)
{
DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
return (T)serializer.ReadObject(strm);
}
只要您想从 JSON 流中反序列化一个对象,只需调用它即可。
想通了:
正如您在问题中看到的那样 body context.GetBody();
给出了 json 包含主机信息、Headers 和消息 body 除了一些其他属性。
我所做的是用“删除”\r\n"s and replacing \" 然后继续 到 https://jsonutils.com/,粘贴生成的 Json,它将把它转换成你应该反序列化到的 类。
问题是 body 包含的信息比发送的消息多得多,所有信息都是由 masstransit 和 rabbitmq 添加的。
public class Message
{
//Message attributes
}
public class Headers
{
}
public class Host
{
public string machineName { get; set; }
public string processName { get; set; }
public int processId { get; set; }
public string assembly { get; set; }
public string assemblyVersion { get; set; }
public string frameworkVersion { get; set; }
public string massTransitVersion { get; set; }
public string operatingSystemVersion { get; set; }
}
public class TotalMessage
{
public string messageId { get; set; }
public string correlationId { get; set; }
public string conversationId { get; set; }
public string sourceAddress { get; set; }
public string destinationAddress { get; set; }
public IList<string> messageType { get; set; }
public Message message { get; set; }
public Headers headers { get; set; }
public Host host { get; set; }
}
反序列化你可以使用:
public static object DeserializeFromStream(MemoryStream stream)
{
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return o;
}
var body = context.GetBody();
var totalMessage= JsonDeserializer<TotalMessage>((MemoryStream)body);
获取消息body:totalMessage.message
更新:
当然,您可以通过以下方式反序列化为消息 body:
public class TotalMessage
{
public Message message { get; set; }
}