在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC
RPC using Rabbitmq in ASP.Net Core 3
我想使用单个请求实现多个队列
RPC Server.cs
如何从服务器发送多条消息
对于前。我创建了实例:
`
RPCServer rpcServer1 = new RPCServer();
rpcServer1.PublishMessage("customerContactPersonsList","customerContactPersons");`
`
RPCServer rpcServer2 = new RPCServer();
rpcServer2.PublishMessage("ProductInfoList", "projects");
public void PublishMessage(string message, string rpcQueueName)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: rpcQueueName, durable: false,
exclusive: true, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: rpcQueueName,
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
Console.WriteLine(" [.] Response message is)", message);
response = message;
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "There was a error";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:"topic", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
};
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
Receiver.cs
** Receiver 端是通过队列检索响应的地方,它还调用基本上发送请求和队列名称作为参数的客户端,以从声明相同队列的服务器端获取 json 数据,当队列的两个名称都匹配时,响应将发送到接收端。** 例如。这里的项目作为参数是给定的队列名称,服务器端也提到了同样的名称。
var rpcClient = new RpcClient();
var customerContactPersons = await rpcClient.CallAsync("", "customerContactPersons");
var response = await rpcClient.CallAsync("","projects");
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
};
}
public Task<string> CallAsync(string message, string rpcQueueName, CancellationToken cancellationToken = default(CancellationToken))
{
IBasicProperties props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(
exchange: "",
routingKey: rpcQueueName,
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
return tcs.Task;
}`
我通过为每个队列和末尾添加连接块来解决它 console.readline();保持连接打开以供队列使用。
using (var connection = rpcServer.PublishMessage(customercontact, "customercontact_rpc_queue"))
using (var connectionObject = rpcServer.PublishMessage(result, "project_rpc_queue"))
using (var customersObject = rpcServer.PublishMessage(customersFromByD, "customer_rpc_queue"))
{
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
我想使用单个请求实现多个队列
RPC Server.cs
如何从服务器发送多条消息 对于前。我创建了实例: `
RPCServer rpcServer1 = new RPCServer();
rpcServer1.PublishMessage("customerContactPersonsList","customerContactPersons");`
`
RPCServer rpcServer2 = new RPCServer();
rpcServer2.PublishMessage("ProductInfoList", "projects");
public void PublishMessage(string message, string rpcQueueName)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: rpcQueueName, durable: false,
exclusive: true, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: rpcQueueName,
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
Console.WriteLine(" [.] Response message is)", message);
response = message;
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "There was a error";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:"topic", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
};
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
Receiver.cs ** Receiver 端是通过队列检索响应的地方,它还调用基本上发送请求和队列名称作为参数的客户端,以从声明相同队列的服务器端获取 json 数据,当队列的两个名称都匹配时,响应将发送到接收端。** 例如。这里的项目作为参数是给定的队列名称,服务器端也提到了同样的名称。
var rpcClient = new RpcClient();
var customerContactPersons = await rpcClient.CallAsync("", "customerContactPersons");
var response = await rpcClient.CallAsync("","projects");
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
};
}
public Task<string> CallAsync(string message, string rpcQueueName, CancellationToken cancellationToken = default(CancellationToken))
{
IBasicProperties props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(
exchange: "",
routingKey: rpcQueueName,
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
return tcs.Task;
}`
我通过为每个队列和末尾添加连接块来解决它 console.readline();保持连接打开以供队列使用。
using (var connection = rpcServer.PublishMessage(customercontact, "customercontact_rpc_queue"))
using (var connectionObject = rpcServer.PublishMessage(result, "project_rpc_queue"))
using (var customersObject = rpcServer.PublishMessage(customersFromByD, "customer_rpc_queue"))
{
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}