Azure 主题辅助角色在 60 秒后停止处理消息

Azure Topics worker role stops processing message after 60 seconds

我们有一个云服务使用工作者角色来处理它从 Azure 服务总线上设置的主题接收到的消息。

邮件本身似乎完好无损,通常都能正确接收和处理。然而,在某些情况下,消息似乎停止处理(日志记录突然结束,并且在我们的 WadLogsTable 中看不到对正在处理的消息的更多引用)。根据我的研究,这可能是由于工作者角色保持其连接打开和空闲时间超过几秒钟而发生的。我将如何防止这些需要长时间处理的消息被遗弃?

我们的工作者角色的代码如下。

public class WorkerRole : RoleEntryPoint
{
    private static StandardKernel _kernel;
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
    private BaseRepository<CallData> _callDataRepository;
    private BaseRepository<CallLog> _callLogRepository;

    private SubscriptionClient _client;
    private NamespaceManager _nManager;
    private OnMessageOptions _options;
    private BaseRepository<Site> _siteRepository;

    public override void Run()
    {
        try
        {
            List<CallInformation> callInfo;
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.

            _client.OnMessage(message =>
            {
                // Process message from subscription.
                Trace.TraceInformation("Call Received. Ready to process message ");
                message.RenewLock();
                callInfo = message.GetBody<List<CallInformation>>();
                writeCallData(callInfo);


                Trace.TraceInformation("Call Processed. Clearing from topic.");
            }, _options);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace);
        }
    }

    private void writeCallData(List<CallInformation> callList)
    {
        try
        {
            Trace.TraceInformation("Calls received: " + callList.Count);
            foreach (var callInfo in callList)
            {
                Trace.TraceInformation("Unwrapping call...");
                var call = callInfo.CallLog.Unwrap();
                Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints");
                Trace.TraceInformation("Inserting Call...");
                _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/);
                    Trace.TraceInformation("Call entry written. Now building datapoint list...");
                    var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList();
                    Trace.TraceInformation("datapoint list constructed. Processing datapoints...");
                    foreach (var data in datapoints)
                    {
                        /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */
                    }
                    Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID);
                Trace.TraceInformation("Call Processed successfully.");
            }
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Call Processing Failed. " + e.Message);
        }
    }

    public override bool OnStart()
    {
        try
        {
            var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            _nManager = NamespaceManager.CreateFromConnectionString(connectionString);
            _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0);
            var topic = new TopicDescription("MyTopic")
            {
                DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0),
                DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0),
                RequiresDuplicateDetection = true,
            };
            if (!_nManager.TopicExists("MyTopic"))
            {
                _nManager.CreateTopic(topic);
            }
            if (!_nManager.SubscriptionExists("MyTopic", "AllMessages"))
            {
                _nManager.CreateSubscription("MyTopic", "AllMessages");
            }
            _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages",
                ReceiveMode.ReceiveAndDelete);
            _options = new OnMessageOptions
            {
                    AutoRenewTimeout = TimeSpan.FromMinutes(5),

            };
            _options.ExceptionReceived += LogErrors;
            CreateKernel();

            _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace);
        }
        return base.OnStart();
    }

    public override void OnStop()
    {
        // Close the connection to Service Bus Queue
        _client.Close();
        _completedEvent.Set();
    }

    void LogErrors(object sender, ExceptionReceivedEventArgs e)
    {
        if (e.Exception != null)
        {
            Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace);
            _client.Close();
        }
    }

    public IKernel CreateKernel()
    {
        _kernel = new StandardKernel();
        /*SNIP: Bind NInjectable repositories */
        return _kernel;
    }
}

您的 Run 方法不会无限期地继续下去。它应该是这样的:

public override void Run()
{
   try
   {
      Trace.WriteLine("WorkerRole entrypoint called", "Information");
      while (true)
      {
         // Add code here that runs in the role instance
      }

   }
   catch (Exception e)
   {
      Trace.WriteLine("Exception during Run: " + e.ToString());
      // Take other action as needed.
   }
}

取自docs:

The Run is considered the Main method for your application. Overriding the Run method is not required; the default implementation never returns. If you do override the Run method, your code should block indefinitely. If the Run method returns, the role is automatically recycled by raising the Stopping event and calling the OnStop method so that your shutdown sequences may be executed before the role is taken offline.

TheDude 的回答非常接近正确答案!事实证明他是对的 运行 方法需要保持活跃而不是立即返回。但是,使用 Azure 服务总线的消息泵机制,您不能将 _client.onMessage(...) 放在 while 循环中,因为这会导致错误(消息泵已经初始化)。

实际需要发生的是在工作者角色开始执行之前需要创建一个手动重置事件,然后在消息泵代码执行之后等待。有关 ManualResetEvent 的文档,请参阅 https://msdn.microsoft.com/en-us/library/system.threading.manualresetevent(v=vs.110).aspx. Additionally, the process is described here: http://www.acousticguitar.pro/questions/607359/using-queueclient-onmessage-in-an-azure-worker-role

我的最终工作者角色 class 如下所示:

public class WorkerRole : RoleEntryPoint
{
    private static StandardKernel _kernel;
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
    private BaseRepository<CallLog> _callLogRepository;

    private SubscriptionClient _client;
    private MessagingFactory _mFact;
    private NamespaceManager _nManager;
    private OnMessageOptions _options;

    public override void Run()
    {
        ManualResetEvent CompletedEvent = new ManualResetEvent(false);
        try
        {
            CallInformation callInfo;
            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
            _client.OnMessage(message =>
            {
                // Process message from subscription.
                Trace.TraceInformation("Call Received. Ready to process message " + message.MessageId);
                callInfo = message.GetBody<CallInformation>();
                WriteCallData(callInfo);

                Trace.TraceInformation("Call Processed. Clearing from topic.");
            }, _options);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace);
        }
        CompletedEvent.WaitOne();
    }

private void writeCallData(List<CallInformation> callList)
{
    try
    {
        Trace.TraceInformation("Calls received: " + callList.Count);
        foreach (var callInfo in callList)
        {
            Trace.TraceInformation("Unwrapping call...");
            var call = callInfo.CallLog.Unwrap();
            Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints");
            Trace.TraceInformation("Inserting Call...");
            _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/);
                Trace.TraceInformation("Call entry written. Now building datapoint list...");
                var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList();
                Trace.TraceInformation("datapoint list constructed. Processing datapoints...");
                foreach (var data in datapoints)
                {
                    /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */
                }
                Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID);
            Trace.TraceInformation("Call Processed successfully.");
        }
    }
    catch (Exception e)
    {
        Trace.TraceInformation("Call Processing Failed. " + e.Message);
    }
}

public override bool OnStart()
{
    try
    {
        var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
        _nManager = NamespaceManager.CreateFromConnectionString(connectionString);
        _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0);
        var topic = new TopicDescription("MyTopic")
        {
            DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0),
            DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0),
            RequiresDuplicateDetection = true,
        };
        if (!_nManager.TopicExists("MyTopic"))
        {
            _nManager.CreateTopic(topic);
        }
        if (!_nManager.SubscriptionExists("MyTopic", "AllMessages"))
        {
            _nManager.CreateSubscription("MyTopic", "AllMessages");
        }
        _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages",
            ReceiveMode.ReceiveAndDelete);
        _options = new OnMessageOptions
        {
                AutoRenewTimeout = TimeSpan.FromMinutes(5),

        };
        _options.ExceptionReceived += LogErrors;
        CreateKernel();

        _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/);
    }
    catch (Exception e)
    {
        Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace);
    }
    return base.OnStart();
}

public override void OnStop()
{
    // Close the connection to Service Bus Queue
    _client.Close();
    _completedEvent.Set();
}

void LogErrors(object sender, ExceptionReceivedEventArgs e)
{
    if (e.Exception != null)
    {
        Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace);
        _client.Close();
    }
}

public IKernel CreateKernel()
{
    _kernel = new StandardKernel();
    /*SNIP: Bind NInjectable repositories */
    return _kernel;
}

}

您会注意到在我的 运行 方法末尾存在 ManualResetEvent 和 WaitOne() 调用。我希望有人觉得这有帮助!