RabbitMQ 消费者作为 windows 服务
RabbitMQ consumer as a windows service
我有一个在 .net 中实现“publish/subscribe 模式的 rabbitmq 消费者应用程序,它作为控制台应用程序完美运行,但是当我将其部署为 windows 服务时,它似乎没有保存数据放入 mongodb.
protected override void OnStart(string[] args)
{
try
{
var connectionString = "mongodb://localhost";
var client = new MongoClient(connectionString);
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
BsonDocument document = BsonDocument.Parse(message);
var database = client.GetDatabase("test");
var collection = database.GetCollection<BsonDocument>("test_collection");
collection.InsertOneAsync(document);
};
channel.BasicConsume(queue: queueName, noAck: true,consumer: consumer);
}
}
}
catch (Exception ex)
{
throw;
}
}
有什么我遗漏的吗?
对我的 Onstart 方法进行以下更改就成功了
protected override void OnStart(string[] args)
{
ConnectionFactory factory = new ConnectionFactory { HostName = localhost" };
var connectionString = "mongodb://localhost";
var client = new MongoClient(connectionString);
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test", type: "fanout");
string queueName = channel.QueueDeclare();
channel.QueueBind(queueName, "test", "");
this.EventLog.WriteEntry("Waiting for messages");
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var message = Encoding.UTF8.GetString(e.Body);
BsonDocument document = BsonDocument.Parse(message);
var database = client.GetDatabase("test");
var collection = database.GetCollection<BsonDocument>("test_collection");
collection.InsertOneAsync(document);
}
}
}
}
在 OnStart() 中进行忙碌等待是个坏主意,因为操作系统会期待从中得到 return。阅读此处:https://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx
编辑:上面代码的问题是您在 using 语句中有连接和通道。这样做的全部意义在于一旦超出范围就将它们处置掉。因此,在这种情况下,即使您正在添加事件处理程序,您也会在退出范围并处理通道等后不久。要解决此问题,请将连接、通道和使用者从“OnStart”方法中拉出并让他们成为 class(可能是私有的)成员。这应该让它们保持打开状态,即使您退出该方法并且您的事件应该继续监听。
今天我们需要将 RabbitMQ 消费者作为 windows 服务,并在方法 OnStart 中使用 Timer 解决。
private Timer _timer;
protected override void OnStart(string[] args)
{
_timer = new Timer();
_timer.Interval = 5000;
_timer.Elapsed += new ElapsedEventHandler(this.OnTimer);
_timer.Start();
}
public void OnTimer(object sender, System.Timers.ElapsedEventArgs args)
{
_timer.Enabled = false;
...
}
非常感谢您的帮助,希望也能帮助解决这个问题
下面的答案帮助我解决了这个问题。如上所述,您不应该在 OnStart 方法中使用 using 语句 而不是 return。所以你可以在 OnStart 方法中得到一条消息,但你不能通过帮助 using 语句来声明消费者。
the solution which fixes the problem for me
希望对你也有帮助
我有一个在 .net 中实现“publish/subscribe 模式的 rabbitmq 消费者应用程序,它作为控制台应用程序完美运行,但是当我将其部署为 windows 服务时,它似乎没有保存数据放入 mongodb.
protected override void OnStart(string[] args)
{
try
{
var connectionString = "mongodb://localhost";
var client = new MongoClient(connectionString);
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
BsonDocument document = BsonDocument.Parse(message);
var database = client.GetDatabase("test");
var collection = database.GetCollection<BsonDocument>("test_collection");
collection.InsertOneAsync(document);
};
channel.BasicConsume(queue: queueName, noAck: true,consumer: consumer);
}
}
}
catch (Exception ex)
{
throw;
}
}
有什么我遗漏的吗?
对我的 Onstart 方法进行以下更改就成功了
protected override void OnStart(string[] args)
{
ConnectionFactory factory = new ConnectionFactory { HostName = localhost" };
var connectionString = "mongodb://localhost";
var client = new MongoClient(connectionString);
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test", type: "fanout");
string queueName = channel.QueueDeclare();
channel.QueueBind(queueName, "test", "");
this.EventLog.WriteEntry("Waiting for messages");
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var message = Encoding.UTF8.GetString(e.Body);
BsonDocument document = BsonDocument.Parse(message);
var database = client.GetDatabase("test");
var collection = database.GetCollection<BsonDocument>("test_collection");
collection.InsertOneAsync(document);
}
}
}
}
在 OnStart() 中进行忙碌等待是个坏主意,因为操作系统会期待从中得到 return。阅读此处:https://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx
编辑:上面代码的问题是您在 using 语句中有连接和通道。这样做的全部意义在于一旦超出范围就将它们处置掉。因此,在这种情况下,即使您正在添加事件处理程序,您也会在退出范围并处理通道等后不久。要解决此问题,请将连接、通道和使用者从“OnStart”方法中拉出并让他们成为 class(可能是私有的)成员。这应该让它们保持打开状态,即使您退出该方法并且您的事件应该继续监听。
今天我们需要将 RabbitMQ 消费者作为 windows 服务,并在方法 OnStart 中使用 Timer 解决。
private Timer _timer;
protected override void OnStart(string[] args)
{
_timer = new Timer();
_timer.Interval = 5000;
_timer.Elapsed += new ElapsedEventHandler(this.OnTimer);
_timer.Start();
}
public void OnTimer(object sender, System.Timers.ElapsedEventArgs args)
{
_timer.Enabled = false;
...
}
非常感谢您的帮助,希望也能帮助解决这个问题
下面的答案帮助我解决了这个问题。如上所述,您不应该在 OnStart 方法中使用 using 语句 而不是 return。所以你可以在 OnStart 方法中得到一条消息,但你不能通过帮助 using 语句来声明消费者。
the solution which fixes the problem for me
希望对你也有帮助