如何在 WCF 中使用 RabbitMQ?
How to consume RabbitMQ in WCF?
我有一个场景,其中可执行文件是生产者,WCF 服务是消费者。
WCF服务工作流程如下:
1)服务调用可执行文件(生产者),这个可执行文件是另一个将消息生成到 RabbitMQ 队列的进程。
2)服务必须使用来自 RabbitMQ 队列的消息
3)Returns给客户端的数据。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;
namespace ConnectionServices
{
public class Connection : IConnection
{
public string ConnectSite(string provider, string server, string siteName)
{
InvokeProducer(provider, server, siteName);
string activeInstance = RunRabbitMQ();
return activeInstance;
}
public void InvokeProducer(string provider, string server, string siteName)
{
string siteManagerExePath = @"C:\Users\mbmercha\Documents\Visual Studio 2015\Projects\Producer\Producer\bin\Debug\Producer.exe";
try
{
ProcessStartInfo startInfo = new ProcessStartInfo();
Process siteManagerProcess = new Process();
startInfo.FileName = siteManagerExePath;
startInfo.Arguments = string.Format("{0} {1} {2} {3}", "-b ", provider, server, siteName);
siteManagerProcess.StartInfo = startInfo;
siteManagerProcess.Start();
siteManagerProcess.WaitForExit();
}
catch (Exception e)
{
}
}
public string RunRabbitMQ()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
string activeInstance = null;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("DurableQueue", true, false, false, null);
channel.ExchangeDeclare("DurableExchange", ExchangeType.Topic, true);
channel.QueueBind("DurableQueue", "DurableExchange", "durable");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
activeInstance = message;
};
channel.BasicConsume(queue: "DurableQueue",
autoAck: false,
consumer: consumer);
}
return activeInstance;
}
}
}
到目前为止,服务能够调用可执行文件并在队列中生成消息。
但是服务从第 2 步开始失败,它返回 null 而不是实际消息。
有人可以告诉我我在这里缺少什么吗?
提前致谢。
您永远不会将 activeInstance
设置为 null
以外的任何值。
您似乎正在使用异步 API,这意味着您在 RunRabbitMQ
方法调用完成很久之后才从 RabbitMQ 检索消息...或者如果您返回时没有立即处理所有消费机械。
如果您想同步检索消息 - 在这种情况下,在同步方法调用中 - 您需要等待消息可用。为此,您需要使用 'pull API',即 channel.BasicGet(...)
.
我有一个场景,其中可执行文件是生产者,WCF 服务是消费者。
WCF服务工作流程如下:
1)服务调用可执行文件(生产者),这个可执行文件是另一个将消息生成到 RabbitMQ 队列的进程。
2)服务必须使用来自 RabbitMQ 队列的消息
3)Returns给客户端的数据。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;
namespace ConnectionServices
{
public class Connection : IConnection
{
public string ConnectSite(string provider, string server, string siteName)
{
InvokeProducer(provider, server, siteName);
string activeInstance = RunRabbitMQ();
return activeInstance;
}
public void InvokeProducer(string provider, string server, string siteName)
{
string siteManagerExePath = @"C:\Users\mbmercha\Documents\Visual Studio 2015\Projects\Producer\Producer\bin\Debug\Producer.exe";
try
{
ProcessStartInfo startInfo = new ProcessStartInfo();
Process siteManagerProcess = new Process();
startInfo.FileName = siteManagerExePath;
startInfo.Arguments = string.Format("{0} {1} {2} {3}", "-b ", provider, server, siteName);
siteManagerProcess.StartInfo = startInfo;
siteManagerProcess.Start();
siteManagerProcess.WaitForExit();
}
catch (Exception e)
{
}
}
public string RunRabbitMQ()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
string activeInstance = null;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("DurableQueue", true, false, false, null);
channel.ExchangeDeclare("DurableExchange", ExchangeType.Topic, true);
channel.QueueBind("DurableQueue", "DurableExchange", "durable");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
activeInstance = message;
};
channel.BasicConsume(queue: "DurableQueue",
autoAck: false,
consumer: consumer);
}
return activeInstance;
}
}
}
到目前为止,服务能够调用可执行文件并在队列中生成消息。
但是服务从第 2 步开始失败,它返回 null 而不是实际消息。 有人可以告诉我我在这里缺少什么吗?
提前致谢。
您永远不会将 activeInstance
设置为 null
以外的任何值。
您似乎正在使用异步 API,这意味着您在 RunRabbitMQ
方法调用完成很久之后才从 RabbitMQ 检索消息...或者如果您返回时没有立即处理所有消费机械。
如果您想同步检索消息 - 在这种情况下,在同步方法调用中 - 您需要等待消息可用。为此,您需要使用 'pull API',即 channel.BasicGet(...)
.