Spring 引导和异步 RabbitMQ RPC
Spring boot and asynchronous RabbitMQ RPC
我正在尝试实施 RabbitMQ RPC 模式 (Request/Response)。
这对我来说是全新的技术。所以我很难过。
这是一个网络应用程序,内置于 spring 启动。
结构:
用户用一些信息填写表单并提交表单,调用处理控制器,例如@{/processUser}
带有来自表单的信息的对象被发送到 RabbitMQ 队列
响应部分发生在其他 spring 项目服务中,即获取请求、构建响应
并将其发回。
构建响应应该在给定的时间范围内对另一个 spring 项目做另一个用户,如果没有,将发回通用响应。
所以我假设响应代码,因为它需要在一个线程上一直等待请求,我需要主线程 spring 引导应用程序到 运行,应该在单独的线程上在后台。因为我需要它是异步的。
这段代码可以按照我的意愿“异步”工作,但我觉得还有更好的方法,只是我不知道。我不知道这个匿名线程将如何处理多个使用网络应用程序的用户。它不需要完美,但可以接受:)
下面的代码还没有完成,没有完成整个事情(发送对象,动态响应......)这只是测试阶段。
请求代码:
public String call(String message) throws Exception{
final String corrID = UUID.randomUUID().toString();
String replayQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrID).replyTo(replayQueueName).build();
channel.basicPublish("", requestQueueName,props,message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrID)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
并在该处理控制器中调用此方法:
try(Connection connection = factory.newConnection()){
channel = connection.createChannel();
System.out.println("Sending request...");
String response = call("Test_Message");
System.out.println(response);
}catch (Exception e){
e.printStackTrace();
}
响应代码:
@Bean
public ConnectionFactory startFactory(){
return new ConnectionFactory();
}
@Bean
public Connection startCon(ConnectionFactory factory) throws Exception{
return factory.newConnection();
}
@Bean
public void reciver(){
new Thread(new Runnable() {
@Override
public void run() {
try{
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println("Awaiting rpc requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) ->{
AMQP.BasicProperties replayProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "RESPONSE_TESTING";
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(message);
channel.basicPublish("",delivery.getProperties().getReplyTo(), replayProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
synchronized (monitor){
monitor.notify();
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
while(true){
synchronized (monitor){
try{
monitor.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
我正在尝试实施 RabbitMQ RPC 模式 (Request/Response)。
这对我来说是全新的技术。所以我很难过。
这是一个网络应用程序,内置于 spring 启动。
结构:
用户用一些信息填写表单并提交表单,调用处理控制器,例如@{/processUser}
带有来自表单的信息的对象被发送到 RabbitMQ 队列
响应部分发生在其他 spring 项目服务中,即获取请求、构建响应 并将其发回。
构建响应应该在给定的时间范围内对另一个 spring 项目做另一个用户,如果没有,将发回通用响应。
所以我假设响应代码,因为它需要在一个线程上一直等待请求,我需要主线程 spring 引导应用程序到 运行,应该在单独的线程上在后台。因为我需要它是异步的。
这段代码可以按照我的意愿“异步”工作,但我觉得还有更好的方法,只是我不知道。我不知道这个匿名线程将如何处理多个使用网络应用程序的用户。它不需要完美,但可以接受:)
下面的代码还没有完成,没有完成整个事情(发送对象,动态响应......)这只是测试阶段。
请求代码:
public String call(String message) throws Exception{
final String corrID = UUID.randomUUID().toString();
String replayQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrID).replyTo(replayQueueName).build();
channel.basicPublish("", requestQueueName,props,message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrID)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
并在该处理控制器中调用此方法:
try(Connection connection = factory.newConnection()){
channel = connection.createChannel();
System.out.println("Sending request...");
String response = call("Test_Message");
System.out.println(response);
}catch (Exception e){
e.printStackTrace();
}
响应代码:
@Bean
public ConnectionFactory startFactory(){
return new ConnectionFactory();
}
@Bean
public Connection startCon(ConnectionFactory factory) throws Exception{
return factory.newConnection();
}
@Bean
public void reciver(){
new Thread(new Runnable() {
@Override
public void run() {
try{
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println("Awaiting rpc requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) ->{
AMQP.BasicProperties replayProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "RESPONSE_TESTING";
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(message);
channel.basicPublish("",delivery.getProperties().getReplyTo(), replayProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
synchronized (monitor){
monitor.notify();
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
while(true){
synchronized (monitor){
try{
monitor.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}