在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC

RPC using Rabbitmq in ASP.Net Core 3

我想使用单个请求实现多个队列

RPC Server.cs

如何从服务器发送多条消息 对于前。我创建了实例: `

  RPCServer rpcServer1 = new RPCServer();
         rpcServer1.PublishMessage("customerContactPersonsList","customerContactPersons");`

`

RPCServer rpcServer2 = new RPCServer();
     rpcServer2.PublishMessage("ProductInfoList", "projects");



  public void PublishMessage(string message, string rpcQueueName)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: rpcQueueName, durable: false,
              exclusive: true, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: rpcQueueName,
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
                {
                    string response = null;

                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        Console.WriteLine(" [.] Response message is)", message);
                        response = message;
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "There was a error";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange:"topic", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                };

                Console.WriteLine("Press [enter] to exit.");
                Console.ReadLine();
            }

Receiver.cs ** Receiver 端是通过队列检索响应的地方,它还调用基本上发送请求和队列名称作为参数的客户端,以从声明相同队列的服务器端获取 json 数据,当队列的两个名称都匹配时,响应将发送到接收端。** 例如。这里的项目作为参数是给定的队列名称,服务器端也提到了同样的名称。

var rpcClient = new RpcClient();
            var customerContactPersons = await rpcClient.CallAsync("", "customerContactPersons");
            var response = await rpcClient.CallAsync("","projects");

var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
                return;
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);
            tcs.TrySetResult(response);
        };
    }

    public Task<string> CallAsync(string message, string rpcQueueName, CancellationToken cancellationToken = default(CancellationToken))
    {
        IBasicProperties props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;
        var messageBytes = Encoding.UTF8.GetBytes(message);
        var tcs = new TaskCompletionSource<string>();
        callbackMapper.TryAdd(correlationId, tcs);

        channel.BasicPublish(
            exchange: "",
            routingKey: rpcQueueName,
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
        return tcs.Task;
    }`

我通过为每个队列和末尾添加连接块来解决它 console.readline();保持连接打开以供队列使用。

using (var connection = rpcServer.PublishMessage(customercontact, "customercontact_rpc_queue"))
                using (var connectionObject = rpcServer.PublishMessage(result, "project_rpc_queue"))
                using (var customersObject = rpcServer.PublishMessage(customersFromByD, "customer_rpc_queue"))
                {
                    Console.WriteLine("Press [enter] to exit.");
                    Console.ReadLine();
                }