如何在 C# 中从 MSMQ 读取数据? (请注意 - 数据不是用 C# 编写的)

How to read data from MSMQ in C# ? (please note - Data is not written in C# )

我在我的项目中使用 nodejs、msmq 和 .net 作为技术堆栈。我正在使用 nodejs 脚本从硬件收集数据并写入 MSMQ。 我想在 .net 中阅读此 MSMQ,但出现以下错误。 此外,我也无法收到消息。

Error System.Xml.XmlException: Data at the root level is invalid. Line 1, position at System.Xml.XmlTextReaderImpl.Throw(Exception e) at System.Xml.XmlTextReaderImpl.Throw(String res, String arg) 'objMessage.ConnectorType' threw an exception of type 'System.InvalidOperationException' 'objMessage.Body' threw an exception of type 'System.Xml.XmlException'

//Nodejs script
const msmq = require('updated-node-msmq');
const queue = msmq.openOrCreateQueue('.\Private$\EEG');
queue.send(records);   
// C# code
if (MessageQueue.Exists(@".\Private$\EEG")){
    objMessageQueue = new MessageQueue(@".\Private$\EEG");
}
objMessage = objMessageQueue.Receive();
objMessage.Formatter = new XmlMessageFormatter(new Type[] 
     {typeof(Payload) });
var message = (Payload)objMessage.Body;

我能够解决这个问题。我没有使用 message.Body,而是使用 message.BodyStream 并将该数据转换为二进制格式。这是我的解决方案。

   using System;
using System.Messaging;
using System.Data.SqlClient;
using System.Data;
using Newtonsoft.Json;
using System.IO;
using System.Text;

namespace NeuroskyListener
{
    class Program
    {
        public static bool isRunning;
        private static MessageQueue messageQueue;

        public static void Main(string[] args)
        {
            InitializeQueue();
            Console.ReadLine();
        }

        private static void InitializeQueue()
        {
            string queuePath = @".\Private$\eegNew";

            if (!MessageQueue.Exists(queuePath))
            {
                messageQueue = MessageQueue.Create(queuePath);
            }
            else
            {
                messageQueue = new MessageQueue(queuePath);
            }
            isRunning = true;
            messageQueue.Formatter = new XmlMessageFormatter(new Type[] {typeof(string) });
            messageQueue.ReceiveCompleted += OnReceiveCompleted;
            messageQueue.BeginReceive();
        }

        private static void OnReceiveCompleted(object source, ReceiveCompletedEventArgs asyncResult)
        {
            try
            {   
                MessageQueue mq =  (MessageQueue)source;

                if (mq != null)
                {
                    try
                    {
                        System.Messaging.Message message = null;
                        try
                        {
                            message = mq.EndReceive(asyncResult.AsyncResult);
                            BinaryReader reader = new BinaryReader(message.BodyStream);

                            int count = (int)message.BodyStream.Length;

                            byte[] bytes = reader.ReadBytes(count);


                           string bodyString = Encoding.UTF8.GetString(bytes);
                            eegPayload lst = JsonConvert.DeserializeObject<eegPayload>(bodyString);

                        }
                        catch (Exception ex)
                        {
                           // LogMessage(ex.Message);
                        }
                        if (message != null)
                        {
                            //Payload payload = message.Body as Payload;

                            Console.WriteLine(message.Body);

                            //if (payload != null)
                            //{
                            //    receivedCounter++;
                            //    if ((receivedCounter % 10000) == 0)
                            //    {
                            //        string messageText = string.Format("Received {0} messages", receivedCounter);
                            //        LogMessage(messageText);
                            //    }
                            //}
                        }
                    }
                    finally
                    {
                        if (isRunning)
                        {
                            mq.BeginReceive();
                        }
                    }
                }
                return;
            }
            catch (Exception exc)
            {
                //LogMessage(exc.Message);
            }
        }
    }
}