如何不断从远程 IBM MQ 获取消息

How to get message constantly from remote IBM MQ

我创建了一个 windows 服务,它将连接到远程 MQ 并获取 MQSTR 格式的消息,但在获取消息后我没有关闭与远程 MQ 的连接。我的 windows 服务将持续检查远程 MQ 中的数据是否可用,但在收到一条消息后,我需要重新启动服务以从远程 MQ 获取另一条消息。谁能告诉我我需要做什么才能不断从远程 MQ 获取消息。任何线索或任何 link 都可以。请帮助

我的C#windows服务代码是这样的:

Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading.Tasks;

namespace MQ_listner
{
    static class Program
    {
        static void Main()
        {
            ServiceBase[] ServicesToRun;
            ServicesToRun = new ServiceBase[]
            {
                new Service1()
            };
            ServiceBase.Run(ServicesToRun);


        }
    }
}

Service1.cs

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;


namespace MQ_listner
{
    public partial class Service1 : ServiceBase
    {
        private MQReader MQReader;
        private string _serviceName = "MQ_Listener";
        private DateTime _TimeStart;
        private bool _run = true; 
        private Thread _thread;
        int WaitWhenStop = 0;
        private DateTime _TimeEnd;
        private TimeSpan _TimeDifference;
        private TimeSpan _TimeElasped = new TimeSpan(0);



        public Service1()
        {
            InitializeComponent();
        }

        protected override void OnStart(string[] args)
        {
            try
            {
                EventLog.WriteEntry(_serviceName + "was started at" + _TimeStart.ToString());
                _run = true;

                _thread = new Thread(new ThreadStart(StartMQListenerService));
                _thread.IsBackground = true;
                _thread.Start();
            }
            catch (Exception ex)
            {
                EventLog.WriteEntry(_serviceName + "was not started . Error Message : " + ex.ToString());
            }


        }

        protected override void OnStop()
        {
            _run = false;
            _thread.Join(WaitWhenStop);

            _TimeEnd = DateTime.Now;
            _TimeDifference = _TimeEnd.Subtract(_TimeStart); 
            _TimeElasped = _TimeElasped.Add(_TimeDifference);
            EventLog.WriteEntry(_serviceName + "was stopped at " + _TimeEnd.ToString() + "\r\n ran for total time :" + _TimeElasped.ToString());
        }


        // MQ connection service 

        public void StartMQListenerService()
        {
            try
            {
                if (_run)
                {
                    if (MQReader == null)
                    {
                        MQReader = new MQReader();
                        MQReader.InitializeConnections();
                        EventLog.WriteEntry(_serviceName + "MQ connection is established");
                    }
                }
            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry(_serviceName, ex.ToString());
                System.Diagnostics.ProcessStartInfo startinfo = new System.Diagnostics.ProcessStartInfo();
                startinfo.WindowStyle = System.Diagnostics.ProcessWindowStyle.Hidden;
                startinfo.FileName = "NET";
                startinfo.Arguments = "stop" + this.ServiceName;
                Process.Start(startinfo);
            }
        }
    }
}


****MQReader.cs****

using System;
using IBM.WMQ;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Linq;
using System.Xml.Linq;
using System.Configuration;

namespace MQ_listner
{
    internal class MQReader
    {
        public MQReader()
        {
        }
        public void InitializeConnections()
        {

            MQQueueManager queueManager;
            MQMessage queueMessage;
            MQGetMessageOptions queueGetMessageOptions;
            MQQueue queue;


            string QueueName;
            string QueueManagerName;
            string ChannelInfo;
            string channelName;
            string PortNumber;
            string transportType;
            string connectionName;

            QueueManagerName = ConfigurationManager.AppSettings["QueueManager"]; 
            QueueName = ConfigurationManager.AppSettings["Queuename"];
            ChannelInfo = ConfigurationManager.AppSettings["ChannelInformation"];
            PortNumber = ConfigurationManager.AppSettings["Port"];
            char[] separator = { '/' };
            string[] ChannelParams;
            ChannelParams = ChannelInfo.Split(separator);
            channelName = ConfigurationManager.AppSettings["Channel"];
            transportType = ConfigurationManager.AppSettings["TransportType"];
            connectionName = ConfigurationManager.AppSettings["ConnectionName"];
            String strReturn = "";

            try
            {
                queueManager = new MQQueueManager(QueueManagerName,
                channelName, connectionName);
                strReturn = "Connected Successfully";

                queue = queueManager.AccessQueue(QueueName,
                MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
                queueMessage = new MQMessage();
                queueMessage.Format = MQC.MQFMT_STRING;
                queueGetMessageOptions = new MQGetMessageOptions();
                queue.Get(queueMessage, queueGetMessageOptions);
                strReturn = queueMessage.ReadString(queueMessage.MessageLength);
            }
            catch (MQException exp)
            {
                strReturn = "Exception: " + exp.Message;
            }

            string path1 = @"C:\documents\Example.txt";
            System.IO.File.WriteAllText(path1, strReturn);

        }
    }
}

谁能告诉我我的代码有什么问题?我需要在此处添加任何内容以不断从远程 MQ 获取消息吗?请帮忙 。任何 link 或线索都可以。

编辑

经过一定时间后,我需要重新启动我的服务以从远程 mq 获取数据。你能告诉我为什么 windows 服务需要重新启动才能获取数据吗?有什么线索吗?有什么想法吗?

您的队列关闭和队列管理器断开连接在哪里?如果您连接 and/or 打开某些东西,您 必须 确保关闭并断开连接。我强烈建议您参加 MQ 编程课程。或者去 MQ Technical Conference 那里有关于 MQ 编程的会议。

我发布了一个功能齐全的 C# MQ 程序,它检索 MQQueueManager message pooling

队列中的所有消息

这是您的 MQReader class 的更新版本,应该会给您正确的想法。注意:我没有测试它。我把那个留给你。 :)

此外,您应该将连接信息放入哈希表中,并将该哈希表传递给 MQQueueManager class。

using System;
using IBM.WMQ;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Linq;
using System.Xml.Linq;
using System.Configuration;

namespace MQ_listner
{
    internal class MQReader
    {
        private MQQueueManager qManager = null;
        private MQMessage      inQ = null;
        private bool           running = true;

        public MQReader()
        {
        }

        public bool InitQMgrAndQueue()
        {
            bool flag = true;
            Hashtable qMgrProp = new Hashtable();
            qMgrProp.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);
            qMgrProp.Add(MQC.HOST_NAME_PROPERTY, ConfigurationManager.AppSettings["ConnectionName"]);
            qMgrProp.Add(MQC.CHANNEL_PROPERTY, ConfigurationManager.AppSettings["Channel"]);

            try
            {
               if (ConfigurationManager.AppSettings["Port"] != null)
                  qMgrProp.Add(MQC.PORT_PROPERTY, System.Int32.Parse(ConfigurationManager.AppSettings["Port"]));
               else
                  qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
            }
            catch (System.FormatException e)
            {
               qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
            }

            if (ConfigurationManager.AppSettings["UserID"] != null)
               qMgrProp.Add(MQC.USER_ID_PROPERTY, ConfigurationManager.AppSettings["UserID"]);

            if (ConfigurationManager.AppSettings["Password"] != null)
               qMgrProp.Add(MQC.PASSWORD_PROPERTY, ConfigurationManager.AppSettings["Password"]);

            try
            {
                qManager = new MQQueueManager(ConfigurationManager.AppSettings["QueueManager"],
                                              qMgrProp);
                System.Console.Out.WriteLine("Connected Successfully");

                inQ = qManager.AccessQueue(ConfigurationManager.AppSettings["Queuename"],
                                              MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
                System.Console.Out.WriteLine("Open queue Successfully");
            }
            catch (MQException exp)
            {
                System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
                flag = false;
            }

            return flag;
        }

        public void LoopThruMessages()
        {
            MQGetMessageOptions gmo = new MQGetMessageOptions();
            gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
            gmo.WaitInterval = 2500;  // 2.5 seconds wait time or use MQC.MQEI_UNLIMITED to wait forever
            MQMessage msg = null;

            while (running)
            {
                try
                {
                   msg = new MQMessage();
                   inQ.Get(msg, gmo);
                   System.Console.Out.WriteLine("Message Data: " + msg.ReadString(msg.MessageLength));
                }
                catch (MQException mqex)
                {
                   if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
                   {
                      // no meesage - life is good - loop again
                   }
                   else
                   {
                      running = false;  // severe error - time to exit
                      System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
                   }
                }
                catch (System.IO.IOException ioex)
                {
                   System.Console.Out.WriteLine("ioex=" + ioex);
                }
            }

            try
            {
               if (inQ != null)
               {
                  inQ.Close();
                  System.Console.Out.WriteLine("Closed queue");
               }
            }
            catch (MQException mqex)
            {
                System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
            }

            try
            {
               if (qMgr != null)
               {
                  qMgr.Disconnect();
                  System.Console.Out.WriteLine("disconnected from queue manager");
               }
            }
            catch (MQException mqex)
            {
                System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
            }
        }

        public void StopIt()
        {
            running = false;
        }
    }
}

无论何时停止服务,请确保它调用 MQReader 中的 StopIt 方法。