使用 AmazonSQSClient 的消息消耗缓慢
Slow message consumption using AmazonSQSClient
所以,我在 spring jms 50-100 中使用并发,允许最大连接数高达 200。一切都按预期工作,但如果我尝试从队列中检索 100k 条消息,我的意思是有 100k 条消息我的 sqs 和我通过 spring jms 正常方法阅读它们。
@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
}
我在我的控制台中看到了所有日志,但在大约 17k 之后它开始抛出异常
类似于:aws sdk 异常:端口已在使用中。
为什么我会看到这个异常,怎么办。我摆脱它?
我试着在互联网上寻找它。找不到任何东西。
我的设置:
并发50-100
为每个任务设置消息:50
客户确认
timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
更新:我查找了问题,似乎正在创建新的套接字,直到每个套接字都用完为止。
我的 spring jms 版本是 4.3.10
要重现此问题,只需将最大连接数设置为 200,将货币设置为 50-100,然后将一些 40k 消息推送到 sqs 队列即可进行上述配置。可以将 https://github.com/adamw/elasticmq 用作本地复制 Amazon sqs 的堆栈服务器。完成后到这里。评论 jms 侦听器并使用 soap ui 负载测试并调用发送消息以触发许多消息。仅仅因为你注释了@jmslistener 注解,它就不会从队列中消费消息。一旦您看到您已经发送了 40k 条消息,请停止。取消注释@jmslistener 并重新启动服务器。
更新:
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setErrorHandler(Throwable::printStackTrace);
factory.setConcurrency("50-100");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
更新:
SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
更新:
客户端配置详情:
Protocol : HTTP
Max connections : 200
更新:
我使用了缓存连接工厂 class,看来。我在堆栈溢出和他们的官方文档中阅读了不使用缓存连接工厂 class 和默认的 jms 侦听器容器工厂。
它给出了我之前遇到的相同错误。
更新
我的目标是达到 500 tps,即我应该能够消耗那么多。所以我尝试了这个方法,似乎我可以达到 100-200,但不会超过那个。加上这个东西是高并发的阻塞..如果你使用它..如果你有更好的解决方案来实现它..我洗耳恭听。
**已更新**
我正在使用 amazonsqsclient
消费者饥饿
JMS 客户端倾向于实施的一种可能的优化是消息消费缓冲区或 "prefetch"。该缓冲区有时可通过消息数量或缓冲区大小(以字节为单位)进行调整。
目的是防止消费者每次收到消息都去服务器,而不是批量拉取多条消息。
在您有许多 "fast consumers" 的环境中(这是这些库可能采取的固执己见的观点),此预取设置为稍高的默认值以尽量减少这些往返。
但是,在消息消费者速度较慢的环境中,这种预取可能是个问题。慢速消费者正在阻止来自更快消费者的那些预取消息的消息消费。在高并发环境下,这会很快造成饥饿。
在这种情况下 SQSConnectionFactory
有一个 property for this:
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);
生产者饥饿(即通过 JmsTemplate
)
这些 JMS 实现期望通过一些中介连接到代理是很常见的。这些中介实际上缓存和重用连接或使用池机制重用它们。在 Java EE 世界中,这通常由 Java EE 服务器上的 JCA 适配器或其他方法处理。
由于 Spring JMS 的工作方式,它需要 ConnectionFactory
的中间委托来执行此 caching/pooling。否则,当 Spring JMS 想要连接到代理时,它会尝试 打开一个新的连接和会话 (!) 每次你想对代理做些什么。
为了解决这个问题,Spring 提供了一些选项。最简单的是 CachingConnectionFactory
,它缓存单个 Connection
,并允许在 Connection
上打开多个 Session
。将其添加到上面的 @Configuration
的简单方法如下:
@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);
// Doing the following is key!
CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
// Set the #connectionfactory properties to your liking here...
return connectionFactory;
}
如果你想要一些更奇特的东西作为 JMS 池解决方案(除了多个 Session
之外,它还会为你池 Connections
和 MessageProducer
),你可以使用相当新的 PooledJMS project 的 JmsPoolConnectionFactory
等,来自他们的图书馆。
所以,我在 spring jms 50-100 中使用并发,允许最大连接数高达 200。一切都按预期工作,但如果我尝试从队列中检索 100k 条消息,我的意思是有 100k 条消息我的 sqs 和我通过 spring jms 正常方法阅读它们。
@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
}
我在我的控制台中看到了所有日志,但在大约 17k 之后它开始抛出异常
类似于:aws sdk 异常:端口已在使用中。
为什么我会看到这个异常,怎么办。我摆脱它?
我试着在互联网上寻找它。找不到任何东西。
我的设置:
并发50-100
为每个任务设置消息:50
客户确认
timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
更新:我查找了问题,似乎正在创建新的套接字,直到每个套接字都用完为止。
我的 spring jms 版本是 4.3.10
要重现此问题,只需将最大连接数设置为 200,将货币设置为 50-100,然后将一些 40k 消息推送到 sqs 队列即可进行上述配置。可以将 https://github.com/adamw/elasticmq 用作本地复制 Amazon sqs 的堆栈服务器。完成后到这里。评论 jms 侦听器并使用 soap ui 负载测试并调用发送消息以触发许多消息。仅仅因为你注释了@jmslistener 注解,它就不会从队列中消费消息。一旦您看到您已经发送了 40k 条消息,请停止。取消注释@jmslistener 并重新启动服务器。
更新:
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setErrorHandler(Throwable::printStackTrace);
factory.setConcurrency("50-100");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
更新:
SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
更新:
客户端配置详情:
Protocol : HTTP
Max connections : 200
更新:
我使用了缓存连接工厂 class,看来。我在堆栈溢出和他们的官方文档中阅读了不使用缓存连接工厂 class 和默认的 jms 侦听器容器工厂。
它给出了我之前遇到的相同错误。
更新
我的目标是达到 500 tps,即我应该能够消耗那么多。所以我尝试了这个方法,似乎我可以达到 100-200,但不会超过那个。加上这个东西是高并发的阻塞..如果你使用它..如果你有更好的解决方案来实现它..我洗耳恭听。
**已更新**
我正在使用 amazonsqsclient
消费者饥饿
JMS 客户端倾向于实施的一种可能的优化是消息消费缓冲区或 "prefetch"。该缓冲区有时可通过消息数量或缓冲区大小(以字节为单位)进行调整。
目的是防止消费者每次收到消息都去服务器,而不是批量拉取多条消息。
在您有许多 "fast consumers" 的环境中(这是这些库可能采取的固执己见的观点),此预取设置为稍高的默认值以尽量减少这些往返。
但是,在消息消费者速度较慢的环境中,这种预取可能是个问题。慢速消费者正在阻止来自更快消费者的那些预取消息的消息消费。在高并发环境下,这会很快造成饥饿。
在这种情况下 SQSConnectionFactory
有一个 property for this:
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);
生产者饥饿(即通过 JmsTemplate
)
这些 JMS 实现期望通过一些中介连接到代理是很常见的。这些中介实际上缓存和重用连接或使用池机制重用它们。在 Java EE 世界中,这通常由 Java EE 服务器上的 JCA 适配器或其他方法处理。
由于 Spring JMS 的工作方式,它需要 ConnectionFactory
的中间委托来执行此 caching/pooling。否则,当 Spring JMS 想要连接到代理时,它会尝试 打开一个新的连接和会话 (!) 每次你想对代理做些什么。
为了解决这个问题,Spring 提供了一些选项。最简单的是 CachingConnectionFactory
,它缓存单个 Connection
,并允许在 Connection
上打开多个 Session
。将其添加到上面的 @Configuration
的简单方法如下:
@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);
// Doing the following is key!
CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
// Set the #connectionfactory properties to your liking here...
return connectionFactory;
}
如果你想要一些更奇特的东西作为 JMS 池解决方案(除了多个 Session
之外,它还会为你池 Connections
和 MessageProducer
),你可以使用相当新的 PooledJMS project 的 JmsPoolConnectionFactory
等,来自他们的图书馆。