应用程序崩溃并退出调试器,而消费者是 运行 & rabbitmq 服务重新启动

Application crashes & exits debugger while consumer is running & rabbit mq service restarts

我有一个 C# 应用程序,我试图在其中测试我的 rabbit mq 客户端的弹性。当消费者 运行 时,我停止了 rabbit mq 服务以查看我的消费者将如何处理。

我几乎所有的消费者都有 trycatch,但是由于后台线程中可能出现异常,我的应用程序在输出 window 中打印如下并且存在调试器。

The thread 'AMQP Connection amqp://test.com:5671' (0x6da18) has exited with code 0 (0x0).

A first chance exception of type 'System.Net.WebException' occurred in System.dll

然后存在调试器。我唯一注意到的是我的消费者 class 的析构函数在代码存在之前被调用。

using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;

namespace RabbitMQClient
{
  public class MessageQueueConsumer : IHealthVerifiable
  {
    public class TimeoutException : Exception { }

    // Have to do this because, somehow, SharedQueue implementation of IEnumerable is faulty
    // Count() method hangs, and never returns
    private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
    {
      public int Count()
      {
        return this.m_queue.Count;
      }
    }

    private const int DEFAULT_ACK_COUNT = 1000;

    private String connString;
    private QueueingBasicConsumer consumer;
    private IConnection conn;
    private IModel channel;
    private String queueName;
    private BufferQueue buffer;
    private Object locker = new Object();
    private ushort prefetchCount;
    private ushort ackCount;

    public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
    {
      this.queueName = queueName;
      this.connString = connString;
      if (ackCount != null)
        this.ackCount = ackCount.Value;
      else
        this.ackCount = DEFAULT_ACK_COUNT;
      this.prefetchCount = (ushort)(this.ackCount * 2);

      InitConsumer();
    }

    ~MessageQueueConsumer()
    {
      Close();
    }

    public void Close()
    {
      try
      {
        channel.Close(200, queueName + " Goodbye");
        conn.Close();
      }
      catch { } //if already closed, do nothing
    }

    private void InitConsumer()
    {
      try
      {
        ConnectionFactory factory = new ConnectionFactory();
        factory.Uri = connString;
        conn = factory.CreateConnection();
        channel = conn.CreateModel();
        channel.BasicQos(0, prefetchCount, false);
        buffer = new BufferQueue();

        consumer = new QueueingBasicConsumer(channel, buffer);
        channel.BasicConsume(queueName, false, consumer);
      }
      catch (Exception e)
      {
        InitConsumer();
      }
    }

    /// <summary>
    /// Get the next event from the queue
    /// </summary>
    /// <returns>Event</returns>
    public byte[] Dequeue(int? timeout = null)
    {
      lock (locker)
      {
        try
        {
          return AttemptDequeue(timeout);
        }
        catch (EndOfStreamException)
        {
          // Network interruption while reading the input stream
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (OperationInterruptedException)
        {
          // The consumer was removed, either through channel or connection closure, or through the
          // action of IModel.BasicCancel().
          // Attempt to reopen and try again
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (ConnectFailureException)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (Exception e)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
      }
    }

    private byte[] AttemptDequeue(int? tomeout)
    {
      BasicDeliverEventArgs message;

      if (tomeout == null)
        message = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
      else
      {
        if (!consumer.Queue.Dequeue(tomeout.Value, out message))
          throw new TimeoutException();
      }

      if (buffer.Count() == 0 || buffer.Count() == ackCount)
        channel.BasicAck(message.DeliveryTag, true);

      try
      {
        return message.Body;
      }
      catch (Exception e)
      {
        throw new SerializationException("Error deserializing queued message:", e);
      }
    }

    /// <summary>
    /// Attempt to connect to queue to see if it is available
    /// </summary>
    /// <returns>true if queue is available</returns>
    public bool IsHealthy()
    {
      try
      {
        if (channel.IsOpen)
          return true;
        else
        {
          InitConsumer();
          return true;
        }
      }
      catch
      {
        return false;
      }
    }
  }
}

知道如何捕获此异常并尝试重试连接吗?

问题是使用 QueueingBasicConsumer,它没有实现任何恢复方法。我更改为 EventingBasicConsumer 并且从故障中恢复成功。

namespace RabbitMQClient {   public class MessageQueueConsumer : IHealthVerifiable   {
    public class TimeoutException : Exception { }

private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
{
  public int Count()
  {
    return this.m_queue.Count;
  }
}

private const int DEFAULT_ACK_COUNT = 1000;

private String connString;
private EventingBasicConsumer consumer;
private IConnection conn;
private IModel channel;
private String queueName;
private BufferQueue buffer;
private Object locker = new Object();
private ushort prefetchCount;
private ushort ackCount;

public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
{
  this.queueName = queueName;
  this.connString = connString;
  if (ackCount != null)
    this.ackCount = ackCount.Value;
  else
    this.ackCount = DEFAULT_ACK_COUNT;
  this.prefetchCount = (ushort)(this.ackCount * 2);

  InitConsumer();
}

~MessageQueueConsumer()
{
  Close();
}

public void Close()
{
  try
  {
    channel.Close(200, queueName + " Goodbye");
   // conn.Close();
  }
  catch { } //if already closed, do nothing
}

private void InitConsumer()
{
  ConnectionFactory factory = new ConnectionFactory();
  factory.Uri = connString;
  conn = factory.CreateConnection();
  channel = conn.CreateModel();
  channel.BasicQos(0, prefetchCount, false);
  buffer = new BufferQueue();

  consumer = new EventingBasicConsumer(channel);
  channel.BasicConsume(queueName, false, consumer);

  // when message is recieved do following
  consumer.Received += (model, message) =>
  {

      if (buffer.Count() > DEFAULT_ACK_COUNT)
        Thread.Sleep(3000);

        buffer.Enqueue(message);

        if (buffer.Count() == 0 || buffer.Count() == ackCount)
          channel.BasicAck(message.DeliveryTag, true);


  };
}

/// <summary>
/// Get the next event from the queue
/// </summary>
/// <returns>Event</returns>
public byte[] Dequeue(int? timeout = null)
{
  lock (locker)
  {
    try
    {
      return AttemptDequeue(timeout);
    }
    catch (EndOfStreamException)
    {
      // Network interruption while reading the input stream
      InitConsumer();
      return AttemptDequeue(timeout);
    }
    catch (OperationInterruptedException)
    {
      // The consumer was removed, either through channel or connection closure, or through the
      // action of IModel.BasicCancel().
      // Attempt to reopen and try again
      InitConsumer();
      return AttemptDequeue(timeout);
    }
    catch (ConnectFailureException)
    {
      //Problems connecting to the queue, wait 10sec, then try again. 
      Thread.Sleep(10000);
      InitConsumer();
      return AttemptDequeue(timeout);
    }
  }
}

private byte[] AttemptDequeue(int? tomeout)
{
  BasicDeliverEventArgs message;

  while (true)
  {
    //while buffer has no events
    if (buffer.Count() == 0)
    {
      Thread.Sleep(3000);
    }
    else
    {
      message = buffer.Dequeue();
      break;
    }

  }

  try
  {
    return message.Body;
  }
  catch (Exception e)
  {
    throw new SerializationException("Error deserializing queued message:", e);
  }
}

/// <summary>
/// Attempt to connect to queue to see if it is available
/// </summary>
/// <returns>true if queue is available</returns>
public bool IsHealthy()
{
  try
  {
    if (channel.IsOpen)
      return true;
    else
    {
      InitConsumer();
      return true;
    }
  }
  catch
  {
    return false;
  }
}   } }