Azure 服务总线 - 使用 OnMessage() 方法接收消息

Azure Service Bus - Receive Messages with OnMessage() Method

MS documentation 之后,从订阅中接收消息并不困难。但是,如果我希望我的应用程序在每次 posted 新消息时都收到一条消息 - 持续轮询。因此 OnMessage() 方法 SubscriptionClient class.

MS documentation 说: "...当调用 OnMessage 时,客户端会启动一个内部消息泵,不断轮询队列或订阅。此消息泵包含发出 Receive() 调用的无限循环。如果调用超时,它会发出下一个 Receive() 调用。..."

但是当应用程序是 运行 时,调用 OnMessage() 方法的那一刻只收到最新的消息。当新消息被 posted 时,持续轮询似乎不起作用。在尝试了许多不同的方法后 我唯一能让它工作并让应用程序在收到新消息时做出反应的方法是将代码放入一个具有无限循环的单独任务中。这在很多层面上似乎都是完全错误的!(见下面的代码)。

任何人都可以帮助我更正我的代码或 post 一个工作示例来在没有循环的情况下完成相同的功能吗?谢谢!

 public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
        {
            var newMessage = new MessageQueue();
            int i = 0;

            Task listener = Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

                    Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();

                    OnMessageOptions options = new OnMessageOptions();
                                     options.AutoComplete = false;
                                     options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

                    Client.OnMessage((message) =>
                    {
                        try
                        {
                            retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
                            retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
                            retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
                            retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
                            retrievedMessage.Add("message", message.Properties["Message"].ToString());

                            newMessage.AnnounceNewMessage(retrievedMessage); // event ->

                            message.Complete(); // Remove message from subscription.
                        }
                        catch (Exception ex)
                        {
                            string exmes = ex.Message;
                            message.Abandon();
                        }

                    }, options);

                    retrievedMessage.Clear();

                    i++;

                    Thread.Sleep(3000);
                }

            });
        }

您的代码有几个问题需要解决 -

  • 它失败了,我假设你的应用程序然后退出 - 或者在 至少正在侦听消息的线程终止。
  • 您的 while 循环不断重复代码以连接消息处理程序, 你只需要这样做一次。
  • 您需要一种方法来使调用堆栈保持活动状态并防止您的应用程序对您的对象进行垃圾回收。

下面的内容应该可以帮助您走向成功。祝你好运。

 ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
    SubscriptionClient Client;

    public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
    {
        Task listener = Task.Factory.StartNew(() =>
        {
            // You only need to set up the below once. 
            Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = false;
            options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
            options.ExceptionReceived += LogErrors;

            Client.OnMessage((message) =>
            {
                try
                {
                    Trace.WriteLine("Got the message with ID {0}", message.MessageId);
                    message.Complete(); // Remove message from subscription.
                }
                catch (Exception ex)
                {
                    Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
                    message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
                }

            }, options);

            CompletedResetEvent.WaitOne();
        });
    }

    /// <summary>
    /// Added in rudimentary exception handling .
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
    private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
    {
        Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
    }

    /// <summary>
    /// Call this to stop the messages arriving from subscription.
    /// </summary>
    public void StopMessagesFromSubscription()
    {
        Client.Close(); // Close the message pump down gracefully
        CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully 
    }

或者,您可以自己使用 ReceiveBatch 以更传统的方式将消息分块:

var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
                                                       cancellationToken);