Spring 失去连接后 AWS SQS 重新连接
Spring AWS SQS Reconnect After Losing Connection
我正在使用 Spring 云 AWS (1.0.1.RELEASE) Spring 引导至 运行 SQS 消费者。应用程序 运行 没问题,但是当它失去网络连接时(例如,如果我在笔记本电脑上 运行s 时关闭我的 WIFI),我在控制台上看到错误并且应用程序永远不会恢复。它只是挂在那里,在网络可用后不会重新连接。我必须杀死它并把它带上来。我如何强制它自行恢复?
// Spring Boot entry point:
public static void main(String[] args) {
SpringApplication.run(MyConsumerConfiguration.class, args);
}
// Message Listener (A different class)
@MessageMapping(value = "myLogicalQueueName" )
public void receive(MyPOJO object) {
}
我在控制台看到的错误:
Exception in thread "simpleMessageListenerContainer-1" com.amazonaws.AmazonClientException: Unable to execute HTTP request: sqs.us-east-1.amazonaws.com
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:473)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:297)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2422)
at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1130)
at com.amazonaws.services.sqs.AmazonSQSAsyncClient.call(AmazonSQSAsyncClient.java:1678)
at com.amazonaws.services.sqs.AmazonSQSAsyncClient.call(AmazonSQSAsyncClient.java:1676)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745
我刚刚弄明白为什么SQS在网络连接丢失后无法重新连接的问题。
实际上 org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.java
的当前 Spring AWS 实施似乎存在问题
private class AsynchronousMessageListener implements Runnable {
private final QueueAttributes queueAttributes;
private final String logicalQueueName;
private AsynchronousMessageListener(String logicalQueueName, QueueAttributes queueAttributes) {
this.logicalQueueName = logicalQueueName;
this.queueAttributes = queueAttributes;
}
@Override
public void run() {
while (isRunning()) {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
break;
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
以上代码启动了一个新线程,该线程轮询 SQS 队列以获取消息。一旦网络连接断开 getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest())
就会抛出 UnknownHostException
,这在代码中未处理并导致线程终止。
所以当稍后建立网络连接时,没有线程轮询队列来检索数据。
我已经就此向 Spring 提出了一个问题。以下是link:https://github.com/spring-cloud/spring-cloud-aws/issues/82
希望这能说明一切。
我正在使用 Spring 云 AWS (1.0.1.RELEASE) Spring 引导至 运行 SQS 消费者。应用程序 运行 没问题,但是当它失去网络连接时(例如,如果我在笔记本电脑上 运行s 时关闭我的 WIFI),我在控制台上看到错误并且应用程序永远不会恢复。它只是挂在那里,在网络可用后不会重新连接。我必须杀死它并把它带上来。我如何强制它自行恢复?
// Spring Boot entry point:
public static void main(String[] args) {
SpringApplication.run(MyConsumerConfiguration.class, args);
}
// Message Listener (A different class)
@MessageMapping(value = "myLogicalQueueName" )
public void receive(MyPOJO object) {
}
我在控制台看到的错误:
Exception in thread "simpleMessageListenerContainer-1" com.amazonaws.AmazonClientException: Unable to execute HTTP request: sqs.us-east-1.amazonaws.com at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:473) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:297) at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2422) at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1130) at com.amazonaws.services.sqs.AmazonSQSAsyncClient.call(AmazonSQSAsyncClient.java:1678) at com.amazonaws.services.sqs.AmazonSQSAsyncClient.call(AmazonSQSAsyncClient.java:1676) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745
我刚刚弄明白为什么SQS在网络连接丢失后无法重新连接的问题。
实际上 org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.java
private class AsynchronousMessageListener implements Runnable {
private final QueueAttributes queueAttributes;
private final String logicalQueueName;
private AsynchronousMessageListener(String logicalQueueName, QueueAttributes queueAttributes) {
this.logicalQueueName = logicalQueueName;
this.queueAttributes = queueAttributes;
}
@Override
public void run() {
while (isRunning()) {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
break;
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
以上代码启动了一个新线程,该线程轮询 SQS 队列以获取消息。一旦网络连接断开 getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest())
就会抛出 UnknownHostException
,这在代码中未处理并导致线程终止。
所以当稍后建立网络连接时,没有线程轮询队列来检索数据。
我已经就此向 Spring 提出了一个问题。以下是link:https://github.com/spring-cloud/spring-cloud-aws/issues/82
希望这能说明一切。