在不增加执行时间的情况下多次调用 return DeferredResults 的异步服务
Call async service which return DeferredResults, multiple times without increasing execution time
我的应用程序应该有 2 个核心端点:push、pull 用于发送和获取数据。
拉取操作应该异步进行并产生 DeferredResult。当用户调用 pull service over rest 时,新的 DefferedResult 被创建并存储到 Map<Long, DefferedResult> results = new ConcurrentHashMap<>()
等待新数据的地方或直到超时过期。
推送操作也会调用 user over rest,并且此操作会检查此操作推送的数据接收者的结果图。当map包含recipient的结果时,这些数据设置为他的结果,返回DefferedResult。
这是基本代码:
@Service
public class FooServiceImpl {
Map<Long, DefferedResult> results = new ConcurrentHashMap<>();
@Transactional
@Override
public DeferredResult<String> pull(Long userId) {
// here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
DeferredResult<String> newResult = new DeferredResult<>(5000L);
results.putIfAbsent(userId, newResult);
newResult.onCompletion(() -> results.remove(userId));
// if (data != null)
// newResult.setResult(data);
return newResult;
}
@Transactional
@Override
public void push(String data, Long recipientId) {
// fooRepository.save(data, recipientId);
if (results.containsKey(recipientId)) {
results.get(recipientId).setResult(data);
}
}
}
代码按预期工作,问题是它也应该适用于多个用户。我猜将调用 pull 操作的最大活跃用户最多为 1000。因此每次 pull 调用最多需要 5 秒,正如我在 DefferedResult 中设置的那样,但事实并非如此。
正如您在图片中看到的,如果我立即多次从我的 javascript 客户端调用剩余的拉取操作,您可以看到任务将按顺序执行,而不是同时执行。我最后触发的任务大约需要 25 秒,但我需要当 1000 个用户同时执行拉操作时,该操作最多需要 5 秒 + 延迟。
如何配置我的应用程序以同时执行这些任务并确保每个任务大约 5 秒或更短(当另一个用户向等待的用户发送内容时)?我尝试将此配置添加到 属性 文件中:
server.tomcat.max-threads=1000
还有这个配置:
@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {
@Override
protected AsyncTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1000);
taskExecutor.initialize();
return taskExecutor;
}
}
但是没有用,还是一样的结果。你能帮我配置一下吗?
编辑:
这就是我从 angular:
调用此服务的方式
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
当我尝试使用像这样的纯 JS 代码多次调用它时:
function httpGet()
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
xmlHttp.send( null );
return xmlHttp.responseText;
}
setInterval(httpGet, 500);
它将更快地执行每个请求调用(大约 7 秒)。我预计增加是导致服务中的数据库调用,但它仍然比 25 秒好。我在 angular 中调用此服务有问题吗?
编辑 2:
我尝试了另一种测试形式,我使用了 jMeter 而不是浏览器。我在 100 个线程中执行了 100 个请求,结果如下:
如您所见,请求将按 10 个进行处理,在达到 50 个请求后应用程序抛出异常:
java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause
我还在使用存储库的地方注释代码,以确保数据库没有任何内容,并且结果相同。另外,我使用 AtomicLong class.
为每个请求设置了 uniqe userId
编辑 3:
我在评论时发现 @Transactional
一切正常!那你能告诉我如何设置spring的交易量大的操作而不增加延迟吗?
我添加了 spring.datasource.maximumPoolSize=1000
来增加我认为应该增加的池大小,所以唯一的问题是如何使用 @Transactional 加速方法。
每次调用 pull 方法都用 @Transactional 注释,因为我需要首先从数据库加载数据并检查是否有新数据,因为是的,我不必创建等待延迟结果。 push 方法也必须使用 @Transaction 进行注释,因为我首先需要将接收到的数据存储在数据库中,然后将该值设置为等待结果。对于我的数据,我使用的是 Postgres。
我认为你需要生产者和消费者结构模型。我为你写代码。希望对你有所帮助
这是示例代码:
DeferredResultStrore
@Component
public class DeferredResultStrore {
private Queue<DeferredResult<String>> responseBodyQueue;
private HashMap<String, List<DeferredResult<InterfaceModel>>> groupMap;
private final long resultTimeOut;
public DeferredResultStrore() {
responseBodyQueue = new LinkedBlockingQueue<DeferredResult<String>>();
groupMap = new HashMap<String, List<DeferredResult<InterfaceModel>>>();
// write time.
resultTimeOut = 1000 * 60 * 60;
}
public Queue<DeferredResult<String>> getResponseBodyQueue() {
return responseBodyQueue;
}
public HashMap<String, List<DeferredResult<InterfaceModel>>> getGroupMap() {
return groupMap;
}
public long getResultTimeOut() {
return resultTimeOut;
}
}
DeferredResultService
public interface DeferredResultService {
public DeferredResult<?> biteResponse(HttpServletResponse resp, HttpServletRequest req);
public DeferredResult<?> biteGroupResponse(String key, HttpServletResponse resp);
}
DeferredResultServiceImpl
@Service
public class DeferredResultServiceImpl implements DeferredResultService {
@Autowired
private DeferredResultStrore deferredResultStore;
@Override
public DeferredResult<?> biteResponse(final HttpServletResponse resp, HttpServletRequest req) {
final DeferredResult<String> defResult = new DeferredResult<String>(deferredResultStore.getResultTimeOut());
removeObserver(resp, defResult, null);
deferredResultStore.getResponseBodyQueue().add(defResult);
return defResult;
}
@Override
public DeferredResult<?> biteGroupResponse(String key, final HttpServletResponse resp) {
final DeferredResult<InterfaceModel> defResult = new DeferredResult<InterfaceModel>(
deferredResultStore.getResultTimeOut());
List<DeferredResult<InterfaceModel>> defResultList = null;
removeObserver(resp, defResult, key);
if (deferredResultStore.getGroupMap().containsKey(key)) {
defResultList = deferredResultStore.getGroupMap().get(key);
defResultList.add(defResult);
} else {
defResultList = new ArrayList<DeferredResult<InterfaceModel>>();
defResultList.add(defResult);
deferredResultStore.getGroupMap().put(key, defResultList);
}
return defResult;
}
private void removeObserver(final HttpServletResponse resp, final DeferredResult<?> defResult, final String key) {
defResult.onCompletion(new Runnable() {
public void run() {
if (key != null) {
List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);
if (defResultList != null) {
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
if (deferredResult.hashCode() == defResult.hashCode()) {
defResultList.remove(deferredResult);
}
}
}
} else {
if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
deferredResultStore.getResponseBodyQueue().remove(defResult);
}
}
}
});
defResult.onTimeout(new Runnable() {
public void run() {
// 206
resp.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
if (key != null) {
List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);
if (defResultList != null) {
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
if (deferredResult.hashCode() == defResult.hashCode()) {
InterfaceModel model = new InterfaceModel();
model.setId(key);
model.setMessage("onTimeout");
deferredResult.setErrorResult(model);
defResultList.remove(deferredResult);
}
}
}
} else {
defResult.setErrorResult("onTimeout");
deferredResultStore.getResponseBodyQueue().remove(defResult);
}
}
});
}
}
推送服务
public interface PushService {
public boolean pushMessage(String message);
public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp);
}
PushServiceImpl
@Service
public class PushServiceImpl implements PushService {
@Autowired
private DeferredResultStrore deferredResultStore;
@Override
public boolean pushMessage(String message) {
if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
for (DeferredResult<String> deferredResult : deferredResultStore.getResponseBodyQueue()) {
deferredResult.setResult(message);
}
deferredResultStore.getResponseBodyQueue().remove();
}
return true;
}
@Override
public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp) {
List<DeferredResult<InterfaceModel>> defResultList = null;
// select data in DB. that is sample group push service. need to connect db.
InterfaceModel model = new InterfaceModel();
model.setMessage("write group message.");
model.setId(key);
if (deferredResultStore.getGroupMap().containsKey(key)) {
defResultList = deferredResultStore.getGroupMap().get(key);
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
deferredResult.setResult(model);
}
deferredResultStore.getGroupMap().remove(key);
}
return true;
}
}
界面模型
public class InterfaceModel {
private String message;
private int idx;
private String id;
// DB Column
public InterfaceModel() {
// TODO Auto-generated constructor stub
}
public InterfaceModel(String message, int idx, String id) {
this.message = message;
this.idx = idx;
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getIdx() {
return idx;
}
public void setIdx(int idx) {
this.idx = idx;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
web.xml
异步支持在设置中非常重要。
<servlet>
<servlet-name>appServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
Java基础
@Bean
public ServletRegistrationBean dispatcherServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(
new DispatcherServlet(), "/");
registration.setAsyncSupported(true);
return registration;
}
事实上:
A DeferredResult is associated with an open request. When the request
completes, the DeferredResult is removed from the map, and then, the
client issues a new long polling request, which adds a new
DeferredResult instance
Spring Boot 会自动将应用程序上下文中的所有 Servlet bean 注册到 servlet 容器。默认情况下,async supported 设置为 true,因此除了为您的 Servlet 创建一个 bean 之外,您无需做任何事情。
@Aligtor,给你 => public @interface EnableAsync
启用 Spring 的异步方法执行功能,类似于 Spring 的 XML 命名空间中的功能。
正如许多人提到的那样,这不是测试性能的正确方法。您要求在特定时间段执行自动请求,就像您在 XMLHttpRequest 中所做的那样。您可以使用 interval
of Observable
作为:
import {Observable} from "rxjs/Observable";
import {Subscription} from "rxjs/Subscription";
private _intervalSubscription: Subscription;
ngOnInit() {
this._intervalSubscription = Observable.interval(500).subscribe(x => {
this.getDataFromServer();
});
}
ngOnDestroy(): void {
this._intervalSubscription.unsubscribe();
}
getDataFromServer() {
// do your network call
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
}
这是从客户端进行轮询的最佳方式。
编辑 1
private prevRequestTime: number;
ngAfterViewInit(): void {
this.getDataFromServer();
}
getDataFromServer() {
this.prevRequestTime = Date.now();
// do your network call
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
this.scheduleRequestAgain();
}, (error) => {
console.log('e', error);
this.scheduleRequestAgain();
});
}
scheduleRequestAgain() {
let diff = Date.now() - this.prevRequestTime;
setTimeout(this.getDataFromServer(), diff);
}
这里的问题似乎是您 运行正在断开数据库池中的连接。
你的方法用 @Transaction
标记,但你的控制器也期待方法的结果,即 DeferredResult
尽快交付,以便释放线程。
现在,这就是当您 运行 请求时发生的情况:
@Transaction
功能在 Spring 代理中实现,它必须打开连接,调用您的主题方法,然后提交或回滚事务。
- 因此,当您的控制器调用
fooService.pull
方法时,它实际上是在调用代理。
- 代理必须首先从池中请求连接,然后调用您的服务方法,该方法在该事务中执行一些数据库操作。最后,它必须提交或回滚事务,最后 return 到池的连接。
- 毕竟你的方法 return 是一个
DeferredResult
,然后传递给控制器 return。
现在,问题是 DeferredResult
的设计方式应该是异步使用。换句话说,promise 预计稍后会在其他线程中解决,我们应该尽快释放请求线程。
事实上,Spring 关于 DeferredResult 的文档说:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(data);
您的代码中的问题正是 DeferredResult
正在同一请求线程中解决。
所以,问题是当 Spring 代理请求连接到数据库池时,当您进行重负载测试时,许多请求会发现池已满并且没有可用连接。所以请求被搁置,但此时你的DeferredResult
还没有创建,所以它的超时功能不存在。
您的请求基本上是在等待数据库池中的某个连接可用。所以,假设 5 秒过去了,然后请求获得连接,现在你得到控制器用来处理响应的 DeferredResult
。最终,5 秒后超时。因此,您必须添加等待来自池的连接的时间和等待 DeferredResult
得到解析的时间。
这就是为什么您可能会看到,当您使用 JMeter 进行测试时,请求时间会随着数据库池中的连接耗尽而逐渐增加。
您可以通过在 application.properties 文件中添加以下内容来为线程池启用一些日志记录:
logging.level.com.zaxxer.hikari=DEBUG
您还可以配置数据库池的大小,甚至添加一些 JMX 支持,这样您就可以从 Java 任务控制中心进行监控:
spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true
使用 JMX 支持,您将能够看到数据库池是如何耗尽的。
这里的技巧在于将解析 promise 的逻辑移动到另一个线程:
@Override
public DeferredResult pull(Long previousId, String username) {
DeferredResult result = createPollingResult(previousId, username);
CompletableFuture.runAsync(() -> {
//this is where you encapsulate your db transaction
List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
if (messages.isEmpty()) {
pollingResults.putIfAbsent(username, result);
} else {
result.setResult(messages);
}
});
return result;
}
通过这样做,您的 DeferredResult
会立即被 return 编辑,并且 Spring 可以在释放宝贵的 Tomcat 线程的同时发挥其异步请求处理的魔力。
我的应用程序应该有 2 个核心端点:push、pull 用于发送和获取数据。
拉取操作应该异步进行并产生 DeferredResult。当用户调用 pull service over rest 时,新的 DefferedResult 被创建并存储到 Map<Long, DefferedResult> results = new ConcurrentHashMap<>()
等待新数据的地方或直到超时过期。
推送操作也会调用 user over rest,并且此操作会检查此操作推送的数据接收者的结果图。当map包含recipient的结果时,这些数据设置为他的结果,返回DefferedResult。
这是基本代码:
@Service
public class FooServiceImpl {
Map<Long, DefferedResult> results = new ConcurrentHashMap<>();
@Transactional
@Override
public DeferredResult<String> pull(Long userId) {
// here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
DeferredResult<String> newResult = new DeferredResult<>(5000L);
results.putIfAbsent(userId, newResult);
newResult.onCompletion(() -> results.remove(userId));
// if (data != null)
// newResult.setResult(data);
return newResult;
}
@Transactional
@Override
public void push(String data, Long recipientId) {
// fooRepository.save(data, recipientId);
if (results.containsKey(recipientId)) {
results.get(recipientId).setResult(data);
}
}
}
代码按预期工作,问题是它也应该适用于多个用户。我猜将调用 pull 操作的最大活跃用户最多为 1000。因此每次 pull 调用最多需要 5 秒,正如我在 DefferedResult 中设置的那样,但事实并非如此。
正如您在图片中看到的,如果我立即多次从我的 javascript 客户端调用剩余的拉取操作,您可以看到任务将按顺序执行,而不是同时执行。我最后触发的任务大约需要 25 秒,但我需要当 1000 个用户同时执行拉操作时,该操作最多需要 5 秒 + 延迟。
如何配置我的应用程序以同时执行这些任务并确保每个任务大约 5 秒或更短(当另一个用户向等待的用户发送内容时)?我尝试将此配置添加到 属性 文件中:
server.tomcat.max-threads=1000
还有这个配置:
@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {
@Override
protected AsyncTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1000);
taskExecutor.initialize();
return taskExecutor;
}
}
但是没有用,还是一样的结果。你能帮我配置一下吗?
编辑:
这就是我从 angular:
调用此服务的方式this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
当我尝试使用像这样的纯 JS 代码多次调用它时:
function httpGet()
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
xmlHttp.send( null );
return xmlHttp.responseText;
}
setInterval(httpGet, 500);
它将更快地执行每个请求调用(大约 7 秒)。我预计增加是导致服务中的数据库调用,但它仍然比 25 秒好。我在 angular 中调用此服务有问题吗?
编辑 2:
我尝试了另一种测试形式,我使用了 jMeter 而不是浏览器。我在 100 个线程中执行了 100 个请求,结果如下:
如您所见,请求将按 10 个进行处理,在达到 50 个请求后应用程序抛出异常:
java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause
我还在使用存储库的地方注释代码,以确保数据库没有任何内容,并且结果相同。另外,我使用 AtomicLong class.
为每个请求设置了 uniqe userId编辑 3:
我在评论时发现 @Transactional
一切正常!那你能告诉我如何设置spring的交易量大的操作而不增加延迟吗?
我添加了 spring.datasource.maximumPoolSize=1000
来增加我认为应该增加的池大小,所以唯一的问题是如何使用 @Transactional 加速方法。
每次调用 pull 方法都用 @Transactional 注释,因为我需要首先从数据库加载数据并检查是否有新数据,因为是的,我不必创建等待延迟结果。 push 方法也必须使用 @Transaction 进行注释,因为我首先需要将接收到的数据存储在数据库中,然后将该值设置为等待结果。对于我的数据,我使用的是 Postgres。
我认为你需要生产者和消费者结构模型。我为你写代码。希望对你有所帮助
这是示例代码:
DeferredResultStrore
@Component
public class DeferredResultStrore {
private Queue<DeferredResult<String>> responseBodyQueue;
private HashMap<String, List<DeferredResult<InterfaceModel>>> groupMap;
private final long resultTimeOut;
public DeferredResultStrore() {
responseBodyQueue = new LinkedBlockingQueue<DeferredResult<String>>();
groupMap = new HashMap<String, List<DeferredResult<InterfaceModel>>>();
// write time.
resultTimeOut = 1000 * 60 * 60;
}
public Queue<DeferredResult<String>> getResponseBodyQueue() {
return responseBodyQueue;
}
public HashMap<String, List<DeferredResult<InterfaceModel>>> getGroupMap() {
return groupMap;
}
public long getResultTimeOut() {
return resultTimeOut;
}
}
DeferredResultService
public interface DeferredResultService {
public DeferredResult<?> biteResponse(HttpServletResponse resp, HttpServletRequest req);
public DeferredResult<?> biteGroupResponse(String key, HttpServletResponse resp);
}
DeferredResultServiceImpl
@Service
public class DeferredResultServiceImpl implements DeferredResultService {
@Autowired
private DeferredResultStrore deferredResultStore;
@Override
public DeferredResult<?> biteResponse(final HttpServletResponse resp, HttpServletRequest req) {
final DeferredResult<String> defResult = new DeferredResult<String>(deferredResultStore.getResultTimeOut());
removeObserver(resp, defResult, null);
deferredResultStore.getResponseBodyQueue().add(defResult);
return defResult;
}
@Override
public DeferredResult<?> biteGroupResponse(String key, final HttpServletResponse resp) {
final DeferredResult<InterfaceModel> defResult = new DeferredResult<InterfaceModel>(
deferredResultStore.getResultTimeOut());
List<DeferredResult<InterfaceModel>> defResultList = null;
removeObserver(resp, defResult, key);
if (deferredResultStore.getGroupMap().containsKey(key)) {
defResultList = deferredResultStore.getGroupMap().get(key);
defResultList.add(defResult);
} else {
defResultList = new ArrayList<DeferredResult<InterfaceModel>>();
defResultList.add(defResult);
deferredResultStore.getGroupMap().put(key, defResultList);
}
return defResult;
}
private void removeObserver(final HttpServletResponse resp, final DeferredResult<?> defResult, final String key) {
defResult.onCompletion(new Runnable() {
public void run() {
if (key != null) {
List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);
if (defResultList != null) {
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
if (deferredResult.hashCode() == defResult.hashCode()) {
defResultList.remove(deferredResult);
}
}
}
} else {
if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
deferredResultStore.getResponseBodyQueue().remove(defResult);
}
}
}
});
defResult.onTimeout(new Runnable() {
public void run() {
// 206
resp.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
if (key != null) {
List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);
if (defResultList != null) {
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
if (deferredResult.hashCode() == defResult.hashCode()) {
InterfaceModel model = new InterfaceModel();
model.setId(key);
model.setMessage("onTimeout");
deferredResult.setErrorResult(model);
defResultList.remove(deferredResult);
}
}
}
} else {
defResult.setErrorResult("onTimeout");
deferredResultStore.getResponseBodyQueue().remove(defResult);
}
}
});
}
}
推送服务
public interface PushService {
public boolean pushMessage(String message);
public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp);
}
PushServiceImpl
@Service
public class PushServiceImpl implements PushService {
@Autowired
private DeferredResultStrore deferredResultStore;
@Override
public boolean pushMessage(String message) {
if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
for (DeferredResult<String> deferredResult : deferredResultStore.getResponseBodyQueue()) {
deferredResult.setResult(message);
}
deferredResultStore.getResponseBodyQueue().remove();
}
return true;
}
@Override
public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp) {
List<DeferredResult<InterfaceModel>> defResultList = null;
// select data in DB. that is sample group push service. need to connect db.
InterfaceModel model = new InterfaceModel();
model.setMessage("write group message.");
model.setId(key);
if (deferredResultStore.getGroupMap().containsKey(key)) {
defResultList = deferredResultStore.getGroupMap().get(key);
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
deferredResult.setResult(model);
}
deferredResultStore.getGroupMap().remove(key);
}
return true;
}
}
界面模型
public class InterfaceModel {
private String message;
private int idx;
private String id;
// DB Column
public InterfaceModel() {
// TODO Auto-generated constructor stub
}
public InterfaceModel(String message, int idx, String id) {
this.message = message;
this.idx = idx;
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getIdx() {
return idx;
}
public void setIdx(int idx) {
this.idx = idx;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
web.xml
异步支持在设置中非常重要。
<servlet>
<servlet-name>appServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
Java基础
@Bean
public ServletRegistrationBean dispatcherServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(
new DispatcherServlet(), "/");
registration.setAsyncSupported(true);
return registration;
}
事实上:
A DeferredResult is associated with an open request. When the request completes, the DeferredResult is removed from the map, and then, the client issues a new long polling request, which adds a new DeferredResult instance
Spring Boot 会自动将应用程序上下文中的所有 Servlet bean 注册到 servlet 容器。默认情况下,async supported 设置为 true,因此除了为您的 Servlet 创建一个 bean 之外,您无需做任何事情。
@Aligtor,给你 => public @interface EnableAsync 启用 Spring 的异步方法执行功能,类似于 Spring 的 XML 命名空间中的功能。
正如许多人提到的那样,这不是测试性能的正确方法。您要求在特定时间段执行自动请求,就像您在 XMLHttpRequest 中所做的那样。您可以使用 interval
of Observable
作为:
import {Observable} from "rxjs/Observable";
import {Subscription} from "rxjs/Subscription";
private _intervalSubscription: Subscription;
ngOnInit() {
this._intervalSubscription = Observable.interval(500).subscribe(x => {
this.getDataFromServer();
});
}
ngOnDestroy(): void {
this._intervalSubscription.unsubscribe();
}
getDataFromServer() {
// do your network call
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
}
这是从客户端进行轮询的最佳方式。
编辑 1
private prevRequestTime: number;
ngAfterViewInit(): void {
this.getDataFromServer();
}
getDataFromServer() {
this.prevRequestTime = Date.now();
// do your network call
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
this.scheduleRequestAgain();
}, (error) => {
console.log('e', error);
this.scheduleRequestAgain();
});
}
scheduleRequestAgain() {
let diff = Date.now() - this.prevRequestTime;
setTimeout(this.getDataFromServer(), diff);
}
这里的问题似乎是您 运行正在断开数据库池中的连接。
你的方法用 @Transaction
标记,但你的控制器也期待方法的结果,即 DeferredResult
尽快交付,以便释放线程。
现在,这就是当您 运行 请求时发生的情况:
@Transaction
功能在 Spring 代理中实现,它必须打开连接,调用您的主题方法,然后提交或回滚事务。- 因此,当您的控制器调用
fooService.pull
方法时,它实际上是在调用代理。 - 代理必须首先从池中请求连接,然后调用您的服务方法,该方法在该事务中执行一些数据库操作。最后,它必须提交或回滚事务,最后 return 到池的连接。
- 毕竟你的方法 return 是一个
DeferredResult
,然后传递给控制器 return。
现在,问题是 DeferredResult
的设计方式应该是异步使用。换句话说,promise 预计稍后会在其他线程中解决,我们应该尽快释放请求线程。
事实上,Spring 关于 DeferredResult 的文档说:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(data);
您的代码中的问题正是 DeferredResult
正在同一请求线程中解决。
所以,问题是当 Spring 代理请求连接到数据库池时,当您进行重负载测试时,许多请求会发现池已满并且没有可用连接。所以请求被搁置,但此时你的DeferredResult
还没有创建,所以它的超时功能不存在。
您的请求基本上是在等待数据库池中的某个连接可用。所以,假设 5 秒过去了,然后请求获得连接,现在你得到控制器用来处理响应的 DeferredResult
。最终,5 秒后超时。因此,您必须添加等待来自池的连接的时间和等待 DeferredResult
得到解析的时间。
这就是为什么您可能会看到,当您使用 JMeter 进行测试时,请求时间会随着数据库池中的连接耗尽而逐渐增加。
您可以通过在 application.properties 文件中添加以下内容来为线程池启用一些日志记录:
logging.level.com.zaxxer.hikari=DEBUG
您还可以配置数据库池的大小,甚至添加一些 JMX 支持,这样您就可以从 Java 任务控制中心进行监控:
spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true
使用 JMX 支持,您将能够看到数据库池是如何耗尽的。
这里的技巧在于将解析 promise 的逻辑移动到另一个线程:
@Override
public DeferredResult pull(Long previousId, String username) {
DeferredResult result = createPollingResult(previousId, username);
CompletableFuture.runAsync(() -> {
//this is where you encapsulate your db transaction
List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
if (messages.isEmpty()) {
pollingResults.putIfAbsent(username, result);
} else {
result.setResult(messages);
}
});
return result;
}
通过这样做,您的 DeferredResult
会立即被 return 编辑,并且 Spring 可以在释放宝贵的 Tomcat 线程的同时发挥其异步请求处理的魔力。