gpt4 book ai didi

java - 在不增加执行时间的情况下多次调用返回 DeferredResults 的异步服务

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:00:18 27 4
gpt4 key购买 nike

我的应用程序应该有 2 个核心端点:pushpull 用于发送和获取数据。

拉取操作应该异步进行并产生 DeferredResult。当用户调用 pull service over rest 时,会创建新的 DefferedResult 并将其存储到 Map<Long, DefferedResult> results = new ConcurrentHashMap<>() 中在哪里等待新数据或直到超时。

推送操作也会调用 user over rest,并且此操作会检查此操作推送的数据接收者的结果图。当map包含接收者的结果时,这些数据被设置为他的结果,返回DefferedResult。

这是基本代码:

@Service
public class FooServiceImpl {
Map<Long, DefferedResult> results = new ConcurrentHashMap<>();

@Transactional
@Override
public DeferredResult<String> pull(Long userId) {
// here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
DeferredResult<String> newResult = new DeferredResult<>(5000L);
results.putIfAbsent(userId, newResult);
newResult.onCompletion(() -> results.remove(userId));

// if (data != null)
// newResult.setResult(data);

return newResult;
}

@Transactional
@Override
public void push(String data, Long recipientId) {
// fooRepository.save(data, recipientId);
if (results.containsKey(recipientId)) {
results.get(recipientId).setResult(data);
}
}
}

代码如我所料工作,问题是它也应该适用于多个用户。我猜将调用 pull 操作的最大活跃用户最多为 1000。因此,正如我在 DefferedResult 中设置的那样,每次 pull 调用最多需要 5 秒,但事实并非如此。

如您在图中所见,如果我立即多次从我的 javascript 客户端调用剩余的拉取操作,您可以看到任务将按顺序执行,而不是同时执行。我最后触发的任务大约需要 25 秒,但我需要当 1000 个用户同时执行拉取操作时,该操作最多需要 5 秒 + 延迟。

enter image description here

如何配置我的应用程序以同时执行这些任务并确保每个任务大约 5 秒或更短(当另一个用户向等待的用户发送内容时)?我尝试将此配置添加到属性文件中:

server.tomcat.max-threads=1000

还有这个配置:

@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {

@Override
protected AsyncTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1000);
taskExecutor.initialize();
return taskExecutor;
}
}

但是没有用,还是一样的结果。你能帮我配置一下吗?

编辑:

这就是我从角度调用此服务的方式:

this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});

当我尝试使用像这样的纯 JS 代码多次调用它时:

function httpGet()
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
xmlHttp.send( null );
return xmlHttp.responseText;
}
setInterval(httpGet, 500);

它将更快地执行每个请求调用(大约 7 秒)。我预计增加是导致服务中的数据库调用,但它仍然比 25 秒好。以角度调用此服务是否有问题?

编辑 2:

我尝试了另一种形式的测试,我使用了 jMeter 而不是浏览器。我在 100 个线程中执行了 100 个请求,结果如下:

enter image description here

如您所见,请求将按 10 个处理,在达到 50 个请求后应用程序抛出异常:

java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause

我还在使用存储库的地方注释代码,以确保数据库中没有任何内容,并且结果相同。我还使用 AtomicLong 类为每个请求设置了唯一的 userId。

编辑 3:

我评论的时候也发现了@Transactional一切正常!那你能告诉我如何在不增加延迟的情况下为大量操作设置spring的事务吗?

我添加了 spring.datasource.maximumPoolSize=1000增加我认为应该增加的池大小,所以唯一的问题是如何使用 @Transactional 加速方法。

每次调用 pull 方法都用 @Transactional 注释,因为我需要首先从数据库加载数据并检查是否有新数据,因为是的,我不必创建等待延迟结果。 push 方法也必须使用 @Transaction 进行注释,因为我首先需要将接收到的数据存储在数据库中,然后将该值设置为等待结果。对于我的数据,我使用的是 Postgres。

最佳答案

这里的问题似乎是数据库池中的连接用完了。

你的方法用 @Transaction 标记,但你的 Controller 也期待方法的结果,即 DeferredResult 尽快交付,以便线程被释放。

现在,这是您运行请求时发生的情况:

  • @Transaction 功能在 Spring 代理中实现,它必须打开一个连接,调用您的主题方法,然后提交或回滚事务。
  • 因此,当您的 Controller 调用fooService.pull 方法时,它实际上是在调用代理。
  • 代理必须首先从池中请求一个连接,然后调用您的服务方法,该方法在该事务中执行一些数据库操作。最后,它必须提交或回滚事务并最终将连接返回到池中。
  • 完成所有这一切后,您的方法返回一个 DeferredResult,然后将其传递给 Controller ​​以供其返回。

现在的问题是 DeferredResult 的设计方式使其应该被异步使用。换句话说,promise 预计稍后会在其他线程中解决,我们应该尽快释放请求线程。

事实上,关于 DeferredResult 的 Spring 文档说:

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}

// From some other thread...
deferredResult.setResult(data);

您的代码中的问题恰恰是 DeferredResult 在同一个请求线程中被解决。

所以,问题是当 Spring 代理请求连接到数据库池时,当您进行重负载测试时,许多请求会发现池已满并且没有可用连接。因此请求被搁置,但此时您的 DeferredResult 尚未创建,因此其超时功能不存在。

您的请求基本上是在等待来自数据库池的某个连接变为可用。因此,假设 5 秒过去了,然后请求获得连接,现在您获得 Controller 用来处理响应的 DeferredResult。最终,5 秒后超时。因此,您必须添加等待来自池的连接的时间和等待 DeferredResult 得到解析的时间。

这就是为什么您可能会看到,当您使用 JMeter 进行测试时,请求时间会随着数据库池中的连接耗尽而逐渐增加。

您可以通过在 application.properties 文件中添加以下内容来为线程池启用一些日志记录:

logging.level.com.zaxxer.hikari=DEBUG

您还可以配置数据库池的大小,甚至添加一些 JMX 支持,以便您可以从 Java Mission Control 对其进行监控:

spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true

使用 JMX 支持,您将能够看到数据库池是如何耗尽的。

这里的技巧在于将解析 promise 的逻辑移动到另一个线程:

@Override
public DeferredResult pull(Long previousId, String username) {


DeferredResult result = createPollingResult(previousId, username);

CompletableFuture.runAsync(() -> {
//this is where you encapsulate your db transaction
List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
if (messages.isEmpty()) {
pollingResults.putIfAbsent(username, result);
} else {
result.setResult(messages);
}
});

return result;
}

通过这样做,您的 DeferredResult 会立即返回,并且 Spring 可以在释放宝贵的 Tomcat 线程的同时发挥其异步请求处理的魔力。

关于java - 在不增加执行时间的情况下多次调用返回 DeferredResults 的异步服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50553383/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com