gpt4 book ai didi

java - Spring 批处理 : How to manage a multi-thread jobs connection to one DataSource (Problem with the reader datasource connection)

转载 作者:太空宇宙 更新时间:2023-11-04 09:07:39 30 4
gpt4 key购买 nike

我有一个多线程作业(同​​时运行 6 个作业),这些作业使用 3 个数据源(第一个数据源上有 2 个,第二个数据源上有另外 2 个,第三个数据源上有最后 2 个)。

/*
**********************************
*
* DATASOURCES CONFIG
*
**********************************
*/

@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource.first")
public DataSource datasource() {
return DataSourceBuilder.create().build();
}

@Bean(name="datasourcereplicaone")
@ConfigurationProperties(prefix = "spring.datasource.replicaone")
public DataSource datasourceReplicaOne() {
return DataSourceBuilder.create().build();
}

@Bean(name="datasourcereplicatwo")
@ConfigurationProperties(prefix = "spring.datasource.replicatwo")
public DataSource datasourceReplicaTwo() {
return DataSourceBuilder.create().build();
}

问题出在 itemReader 上:

@Bean(name = "readerfeedsurvcapacity", destroyMethod = "")
@StepScope
public JdbcCursorItemReader<Long> readerfeedsurvcapacity(@Value("#{jobParameters['exchangeIdSession']}")String exchangeIdSession,@Value("#{jobParameters['hostbase_param']}")String hostbase_param) {
//recupere en seconde le retard
String sqlRequest = "My sql request that run in 0.4 sec";
logger.debug("[BatchConfiguration][readerfeedsurvcapacity] Requete SQL du job feedsurvcapacity : " + sqlRequest);

JdbcCursorItemReader<Long> dbReader = new JdbcCursorItemReader<>();
switch (hostbase_param) {
case "PRIMARY":
dbReader.setDataSource(datasource());
break;
case "R1":
dbReader.setDataSource(datasourceReplicaOne());
break;
default:
dbReader.setDataSource(datasourceReplicaTwo());
break;
}

dbReader.setSql(sqlRequest);
dbReader.setRowMapper(new InstrumentSessionRowMapper());
return dbReader;
}

每一分钟,它们将是同时使用数据源的 6 个作业中的 2 个,例如,我认为因为 sqlrequest 在 0.4 秒内运行,所以其中一个作业关闭数据源,另一个作业没有时间完成,所以我出现此错误:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

Last packet sent to the server was 1001 ms ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1074)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2873)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2763)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3299)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1837)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:1961)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2543)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1737)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1888)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java)
at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:127)
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:428)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:150)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$88b6e64a.open(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:113)
at com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:160)
at com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:188)
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2329)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2774)
... 32 more
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

Last packet sent to the server was 1007 ms ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1074)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2873)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2763)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3299)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1837)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:1961)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2543)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1737)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1888)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java)
at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:127)
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:428)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:150)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$88b6e64a.open(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:113)
at com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:160)
at com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:188)
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2329)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2774)
... 32 more
67850 [SimpleAsyncTaskExecutor-3] ERROR org.springframework.batch.core.step.AbstractStep - Encountered an error executing step feedsurvcapacitystep in job jobfeedsurvcapacity
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:153)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$88b6e64a.open(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.batch.item.ItemStreamException: Error while closing item reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.close(AbstractItemCountingItemStreamItemReader.java:142)
at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:131)
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:428)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:150)
... 20 more
Caused by: java.sql.SQLException: Connection is closed
at com.zaxxer.hikari.pool.ProxyConnection$ClosedConnection.lambda$getClosedConnection$0(ProxyConnection.java:466)
at com.sun.proxy.$Proxy55.setAutoCommit(Unknown Source)
at com.zaxxer.hikari.pool.ProxyConnection.setAutoCommit(ProxyConnection.java:380)
at com.zaxxer.hikari.pool.HikariProxyConnection.setAutoCommit(HikariProxyConnection.java)
at org.springframework.batch.item.database.AbstractCursorItemReader.doClose(AbstractCursorItemReader.java:402)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.close(AbstractItemCountingItemStreamItemReader.java:139)
... 23 more

最佳答案

这就是我所说的工作,它是一个每分钟执行一次的调度程序

   for (int i = 0; i < quality.length; i++) {
try {
logger.debug("[ScheduledTasks][feedSurveillanceFluxCapacityEveryMinute] PRIMARY HOSTBASE");
jobLauncher.run(jobfeedsurvcapacity,
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.addString("quality_param", quality[i])
.addString("hostbase_param", "PRIMARY").addLong("exchangeIdSession", exchangeIdSession[i])
.toJobParameters());

logger.debug("[ScheduledTasks][feedSurveillanceFluxCapacityEveryMinute] REPLICA 1 HOSTBASE");
jobLauncher.run(jobfeedsurvcapacity,
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.addString("quality_param", quality[i]).addString("hostbase_param", "R1").addLong("exchangeIdSession", exchangeIdSession[i])
.toJobParameters());

logger.debug("[ScheduledTasks][feedSurveillanceFluxCapacityEveryMinute] REPLICA 2 HOSTBASE");
jobLauncher.run(jobfeedsurvcapacity,
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.addString("quality_param", quality[i]).addString("hostbase_param", "R2").addLong("exchangeIdSession", exchangeIdSession[i])
.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException | JobRestartException e) {
e.printStackTrace();
}
}

quality.length = 2 因此有 3 个作业称为 * 2

关于java - Spring 批处理 : How to manage a multi-thread jobs connection to one DataSource (Problem with the reader datasource connection),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60037506/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com