应用程序崩溃并退出调试器,而消费者是 运行 & rabbitmq 服务重新启动
Application crashes & exits debugger while consumer is running & rabbit mq service restarts
我有一个 C# 应用程序,我试图在其中测试我的 rabbit mq 客户端的弹性。当消费者 运行 时,我停止了 rabbit mq 服务以查看我的消费者将如何处理。
我几乎所有的消费者都有 try
和 catch
,但是由于后台线程中可能出现异常,我的应用程序在输出 window 中打印如下并且存在调试器。
The thread 'AMQP Connection amqp://test.com:5671' (0x6da18) has exited
with code 0 (0x0).
A first chance exception of type 'System.Net.WebException' occurred in
System.dll
然后存在调试器。我唯一注意到的是我的消费者 class 的析构函数在代码存在之前被调用。
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;
namespace RabbitMQClient
{
public class MessageQueueConsumer : IHealthVerifiable
{
public class TimeoutException : Exception { }
// Have to do this because, somehow, SharedQueue implementation of IEnumerable is faulty
// Count() method hangs, and never returns
private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
{
public int Count()
{
return this.m_queue.Count;
}
}
private const int DEFAULT_ACK_COUNT = 1000;
private String connString;
private QueueingBasicConsumer consumer;
private IConnection conn;
private IModel channel;
private String queueName;
private BufferQueue buffer;
private Object locker = new Object();
private ushort prefetchCount;
private ushort ackCount;
public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
{
this.queueName = queueName;
this.connString = connString;
if (ackCount != null)
this.ackCount = ackCount.Value;
else
this.ackCount = DEFAULT_ACK_COUNT;
this.prefetchCount = (ushort)(this.ackCount * 2);
InitConsumer();
}
~MessageQueueConsumer()
{
Close();
}
public void Close()
{
try
{
channel.Close(200, queueName + " Goodbye");
conn.Close();
}
catch { } //if already closed, do nothing
}
private void InitConsumer()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = connString;
conn = factory.CreateConnection();
channel = conn.CreateModel();
channel.BasicQos(0, prefetchCount, false);
buffer = new BufferQueue();
consumer = new QueueingBasicConsumer(channel, buffer);
channel.BasicConsume(queueName, false, consumer);
}
catch (Exception e)
{
InitConsumer();
}
}
/// <summary>
/// Get the next event from the queue
/// </summary>
/// <returns>Event</returns>
public byte[] Dequeue(int? timeout = null)
{
lock (locker)
{
try
{
return AttemptDequeue(timeout);
}
catch (EndOfStreamException)
{
// Network interruption while reading the input stream
InitConsumer();
return AttemptDequeue(timeout);
}
catch (OperationInterruptedException)
{
// The consumer was removed, either through channel or connection closure, or through the
// action of IModel.BasicCancel().
// Attempt to reopen and try again
InitConsumer();
return AttemptDequeue(timeout);
}
catch (ConnectFailureException)
{
//Problems connecting to the queue, wait 10sec, then try again.
Thread.Sleep(10000);
InitConsumer();
return AttemptDequeue(timeout);
}
catch (Exception e)
{
//Problems connecting to the queue, wait 10sec, then try again.
Thread.Sleep(10000);
InitConsumer();
return AttemptDequeue(timeout);
}
}
}
private byte[] AttemptDequeue(int? tomeout)
{
BasicDeliverEventArgs message;
if (tomeout == null)
message = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
else
{
if (!consumer.Queue.Dequeue(tomeout.Value, out message))
throw new TimeoutException();
}
if (buffer.Count() == 0 || buffer.Count() == ackCount)
channel.BasicAck(message.DeliveryTag, true);
try
{
return message.Body;
}
catch (Exception e)
{
throw new SerializationException("Error deserializing queued message:", e);
}
}
/// <summary>
/// Attempt to connect to queue to see if it is available
/// </summary>
/// <returns>true if queue is available</returns>
public bool IsHealthy()
{
try
{
if (channel.IsOpen)
return true;
else
{
InitConsumer();
return true;
}
}
catch
{
return false;
}
}
}
}
知道如何捕获此异常并尝试重试连接吗?
问题是使用 QueueingBasicConsumer
,它没有实现任何恢复方法。我更改为 EventingBasicConsumer
并且从故障中恢复成功。
namespace RabbitMQClient { public class MessageQueueConsumer : IHealthVerifiable {
public class TimeoutException : Exception { }
private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
{
public int Count()
{
return this.m_queue.Count;
}
}
private const int DEFAULT_ACK_COUNT = 1000;
private String connString;
private EventingBasicConsumer consumer;
private IConnection conn;
private IModel channel;
private String queueName;
private BufferQueue buffer;
private Object locker = new Object();
private ushort prefetchCount;
private ushort ackCount;
public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
{
this.queueName = queueName;
this.connString = connString;
if (ackCount != null)
this.ackCount = ackCount.Value;
else
this.ackCount = DEFAULT_ACK_COUNT;
this.prefetchCount = (ushort)(this.ackCount * 2);
InitConsumer();
}
~MessageQueueConsumer()
{
Close();
}
public void Close()
{
try
{
channel.Close(200, queueName + " Goodbye");
// conn.Close();
}
catch { } //if already closed, do nothing
}
private void InitConsumer()
{
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = connString;
conn = factory.CreateConnection();
channel = conn.CreateModel();
channel.BasicQos(0, prefetchCount, false);
buffer = new BufferQueue();
consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
// when message is recieved do following
consumer.Received += (model, message) =>
{
if (buffer.Count() > DEFAULT_ACK_COUNT)
Thread.Sleep(3000);
buffer.Enqueue(message);
if (buffer.Count() == 0 || buffer.Count() == ackCount)
channel.BasicAck(message.DeliveryTag, true);
};
}
/// <summary>
/// Get the next event from the queue
/// </summary>
/// <returns>Event</returns>
public byte[] Dequeue(int? timeout = null)
{
lock (locker)
{
try
{
return AttemptDequeue(timeout);
}
catch (EndOfStreamException)
{
// Network interruption while reading the input stream
InitConsumer();
return AttemptDequeue(timeout);
}
catch (OperationInterruptedException)
{
// The consumer was removed, either through channel or connection closure, or through the
// action of IModel.BasicCancel().
// Attempt to reopen and try again
InitConsumer();
return AttemptDequeue(timeout);
}
catch (ConnectFailureException)
{
//Problems connecting to the queue, wait 10sec, then try again.
Thread.Sleep(10000);
InitConsumer();
return AttemptDequeue(timeout);
}
}
}
private byte[] AttemptDequeue(int? tomeout)
{
BasicDeliverEventArgs message;
while (true)
{
//while buffer has no events
if (buffer.Count() == 0)
{
Thread.Sleep(3000);
}
else
{
message = buffer.Dequeue();
break;
}
}
try
{
return message.Body;
}
catch (Exception e)
{
throw new SerializationException("Error deserializing queued message:", e);
}
}
/// <summary>
/// Attempt to connect to queue to see if it is available
/// </summary>
/// <returns>true if queue is available</returns>
public bool IsHealthy()
{
try
{
if (channel.IsOpen)
return true;
else
{
InitConsumer();
return true;
}
}
catch
{
return false;
}
} } }
我有一个 C# 应用程序,我试图在其中测试我的 rabbit mq 客户端的弹性。当消费者 运行 时,我停止了 rabbit mq 服务以查看我的消费者将如何处理。
我几乎所有的消费者都有 try
和 catch
,但是由于后台线程中可能出现异常,我的应用程序在输出 window 中打印如下并且存在调试器。
The thread 'AMQP Connection amqp://test.com:5671' (0x6da18) has exited with code 0 (0x0).
A first chance exception of type 'System.Net.WebException' occurred in System.dll
然后存在调试器。我唯一注意到的是我的消费者 class 的析构函数在代码存在之前被调用。
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;
namespace RabbitMQClient
{
public class MessageQueueConsumer : IHealthVerifiable
{
public class TimeoutException : Exception { }
// Have to do this because, somehow, SharedQueue implementation of IEnumerable is faulty
// Count() method hangs, and never returns
private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
{
public int Count()
{
return this.m_queue.Count;
}
}
private const int DEFAULT_ACK_COUNT = 1000;
private String connString;
private QueueingBasicConsumer consumer;
private IConnection conn;
private IModel channel;
private String queueName;
private BufferQueue buffer;
private Object locker = new Object();
private ushort prefetchCount;
private ushort ackCount;
public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
{
this.queueName = queueName;
this.connString = connString;
if (ackCount != null)
this.ackCount = ackCount.Value;
else
this.ackCount = DEFAULT_ACK_COUNT;
this.prefetchCount = (ushort)(this.ackCount * 2);
InitConsumer();
}
~MessageQueueConsumer()
{
Close();
}
public void Close()
{
try
{
channel.Close(200, queueName + " Goodbye");
conn.Close();
}
catch { } //if already closed, do nothing
}
private void InitConsumer()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = connString;
conn = factory.CreateConnection();
channel = conn.CreateModel();
channel.BasicQos(0, prefetchCount, false);
buffer = new BufferQueue();
consumer = new QueueingBasicConsumer(channel, buffer);
channel.BasicConsume(queueName, false, consumer);
}
catch (Exception e)
{
InitConsumer();
}
}
/// <summary>
/// Get the next event from the queue
/// </summary>
/// <returns>Event</returns>
public byte[] Dequeue(int? timeout = null)
{
lock (locker)
{
try
{
return AttemptDequeue(timeout);
}
catch (EndOfStreamException)
{
// Network interruption while reading the input stream
InitConsumer();
return AttemptDequeue(timeout);
}
catch (OperationInterruptedException)
{
// The consumer was removed, either through channel or connection closure, or through the
// action of IModel.BasicCancel().
// Attempt to reopen and try again
InitConsumer();
return AttemptDequeue(timeout);
}
catch (ConnectFailureException)
{
//Problems connecting to the queue, wait 10sec, then try again.
Thread.Sleep(10000);
InitConsumer();
return AttemptDequeue(timeout);
}
catch (Exception e)
{
//Problems connecting to the queue, wait 10sec, then try again.
Thread.Sleep(10000);
InitConsumer();
return AttemptDequeue(timeout);
}
}
}
private byte[] AttemptDequeue(int? tomeout)
{
BasicDeliverEventArgs message;
if (tomeout == null)
message = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
else
{
if (!consumer.Queue.Dequeue(tomeout.Value, out message))
throw new TimeoutException();
}
if (buffer.Count() == 0 || buffer.Count() == ackCount)
channel.BasicAck(message.DeliveryTag, true);
try
{
return message.Body;
}
catch (Exception e)
{
throw new SerializationException("Error deserializing queued message:", e);
}
}
/// <summary>
/// Attempt to connect to queue to see if it is available
/// </summary>
/// <returns>true if queue is available</returns>
public bool IsHealthy()
{
try
{
if (channel.IsOpen)
return true;
else
{
InitConsumer();
return true;
}
}
catch
{
return false;
}
}
}
}
知道如何捕获此异常并尝试重试连接吗?
问题是使用 QueueingBasicConsumer
,它没有实现任何恢复方法。我更改为 EventingBasicConsumer
并且从故障中恢复成功。
namespace RabbitMQClient { public class MessageQueueConsumer : IHealthVerifiable {
public class TimeoutException : Exception { }
private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
{
public int Count()
{
return this.m_queue.Count;
}
}
private const int DEFAULT_ACK_COUNT = 1000;
private String connString;
private EventingBasicConsumer consumer;
private IConnection conn;
private IModel channel;
private String queueName;
private BufferQueue buffer;
private Object locker = new Object();
private ushort prefetchCount;
private ushort ackCount;
public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
{
this.queueName = queueName;
this.connString = connString;
if (ackCount != null)
this.ackCount = ackCount.Value;
else
this.ackCount = DEFAULT_ACK_COUNT;
this.prefetchCount = (ushort)(this.ackCount * 2);
InitConsumer();
}
~MessageQueueConsumer()
{
Close();
}
public void Close()
{
try
{
channel.Close(200, queueName + " Goodbye");
// conn.Close();
}
catch { } //if already closed, do nothing
}
private void InitConsumer()
{
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = connString;
conn = factory.CreateConnection();
channel = conn.CreateModel();
channel.BasicQos(0, prefetchCount, false);
buffer = new BufferQueue();
consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
// when message is recieved do following
consumer.Received += (model, message) =>
{
if (buffer.Count() > DEFAULT_ACK_COUNT)
Thread.Sleep(3000);
buffer.Enqueue(message);
if (buffer.Count() == 0 || buffer.Count() == ackCount)
channel.BasicAck(message.DeliveryTag, true);
};
}
/// <summary>
/// Get the next event from the queue
/// </summary>
/// <returns>Event</returns>
public byte[] Dequeue(int? timeout = null)
{
lock (locker)
{
try
{
return AttemptDequeue(timeout);
}
catch (EndOfStreamException)
{
// Network interruption while reading the input stream
InitConsumer();
return AttemptDequeue(timeout);
}
catch (OperationInterruptedException)
{
// The consumer was removed, either through channel or connection closure, or through the
// action of IModel.BasicCancel().
// Attempt to reopen and try again
InitConsumer();
return AttemptDequeue(timeout);
}
catch (ConnectFailureException)
{
//Problems connecting to the queue, wait 10sec, then try again.
Thread.Sleep(10000);
InitConsumer();
return AttemptDequeue(timeout);
}
}
}
private byte[] AttemptDequeue(int? tomeout)
{
BasicDeliverEventArgs message;
while (true)
{
//while buffer has no events
if (buffer.Count() == 0)
{
Thread.Sleep(3000);
}
else
{
message = buffer.Dequeue();
break;
}
}
try
{
return message.Body;
}
catch (Exception e)
{
throw new SerializationException("Error deserializing queued message:", e);
}
}
/// <summary>
/// Attempt to connect to queue to see if it is available
/// </summary>
/// <returns>true if queue is available</returns>
public bool IsHealthy()
{
try
{
if (channel.IsOpen)
return true;
else
{
InitConsumer();
return true;
}
}
catch
{
return false;
}
} } }