Spring 引导和异步 RabbitMQ RPC

Spring boot and asynchronous RabbitMQ RPC

我正在尝试实施 RabbitMQ RPC 模式 (Request/Response)。

这对我来说是全新的技术。所以我很难过。

这是一个网络应用程序,内置于 spring 启动。

结构:

所以我假设响应代码,因为它需要在一个线程上一直等待请求,我需要主线程 spring 引导应用程序到 运行,应该在单独的线程上在后台。因为我需要它是异步的。

这段代码可以按照我的意愿“异步”工作,但我觉得还有更好的方法,只是我不知道。我不知道这个匿名线程将如何处理多个使用网络应用程序的用户。它不需要完美,但可以接受:)

下面的代码还没有完成,没有完成整个事情(发送对象,动态响应......)这只是测试阶段。

请求代码:

public  String call(String message) throws Exception{
          final String corrID = UUID.randomUUID().toString();

        String replayQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrID).replyTo(replayQueueName).build();

        channel.basicPublish("", requestQueueName,props,message.getBytes());

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrID)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });
        String result  = response.take();
        channel.basicCancel(ctag);
        return result;

并在该处理控制器中调用此方法:

try(Connection connection = factory.newConnection()){
            channel = connection.createChannel();
            System.out.println("Sending request...");

            String response = call("Test_Message");
            System.out.println(response);

        }catch (Exception e){
            e.printStackTrace();
        }

响应代码:

@Bean
    public ConnectionFactory startFactory(){
        return new ConnectionFactory();
    }

    @Bean
    public Connection startCon(ConnectionFactory factory) throws Exception{
        return  factory.newConnection();
    }

    @Bean
    public void reciver(){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{

                    Channel channel = connection.createChannel();
                    channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
                    channel.queuePurge(RPC_QUEUE_NAME);

                    channel.basicQos(1);

                    System.out.println("Awaiting rpc requests");

                    Object monitor = new Object();
                    DeliverCallback deliverCallback = (consumerTag, delivery) ->{
                        AMQP.BasicProperties replayProps = new AMQP.BasicProperties.Builder()
                                .correlationId(delivery.getProperties().getCorrelationId())
                                .build();
                        String response = "RESPONSE_TESTING";
                        String message = new String(delivery.getBody(),"UTF-8");
                        System.out.println(message);
                        channel.basicPublish("",delivery.getProperties().getReplyTo(), replayProps, response.getBytes());
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        synchronized (monitor){
                            monitor.notify();
                        }

                    };
                    channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
                    while(true){
                        synchronized (monitor){
                            try{
                                monitor.wait();
                            }catch (InterruptedException e){
                                e.printStackTrace();
                            }
                        }
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }

            }
        }).start();



    }

我不太确定您要在这里实现什么,但是鉴于您提供的代码,您似乎在尝试重新发明轮子。由于您使用的是 spring-boot,因此无需创建能够 运行 异步使用消息的整个基础架构。

这已由 Spring 作为开箱即用的 spring-amqp-starter 的一部分提供。例如,在这种情况下,您不必手动轮询队列来获取响应,因为此操作由已声明的 RabbitListener.

处理

我建议您通读文档和各种示例,以便更好地了解如何使用 Rabbit。您可以查看那些 here and here.