gpt4 book ai didi

java - Spring Batch 自定义编写器 close 方法被调用两次(需要特殊处理)

转载 作者:行者123 更新时间:2023-12-01 21:49:27 26 4
gpt4 key购买 nike

我有以下编写器类原型(prototype):

public class MyWriter extends AbstractItemStreamItemWriter<FieldSet> {
...
@Override
public void close()
{
...
{
...
}

我的工作是由这些@Beans定义的:

    @Bean
protected Step step(ItemReader<FieldSet> reader, ItemWriter<FieldSet> writer)
{
return stepBuilder.get("step")
.<FieldSet, FieldSet> chunk(chunkSize)
.reader(reader)
.writer(writer)
.listener(this)
.build();
}

@Bean
protected Job myImportJob(Step step, JobExecutionListener jobExecutionListener)
{
return jobBuilder
.get("myImportJob")
.listener(jobExecutionListener)
.start(step)
.build();
}

该作业是从 MQ 监听器触发的,如下所示:

@Autowired
private Job job;

@Autowired
private JobLauncher jobLauncher;

@JmsListener(destination = "queue_name")
public void receiveMessage(TextMessage message) throws JMSException
{
log.warn("Received message {} with listener {}.", message, this);

JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

// add job parameters here
...

JobParameters jobParameters = jobParametersBuilder.toJobParameters();

try
{
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.trace("jobExecution = " + jobExecution);
}
catch (JobExecutionException jee)
{
log.warn("Could not run job {}", job);
}
}

我遇到的问题是 MyWriter 的 close 方法被调用两次。每个调用的堆栈跟踪是:

Thread [DefaultMessageListenerContainer-2] (Suspended (breakpoint at line 162 in MyWriter)) 
MyWriter.close() line: 162
MyWriter$$FastClassBySpringCGLIB$$80508a22.invoke(int, Object, Object[]) line: not available
MethodProxy.invoke(Object, Object[]) line: 204
CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 720
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 157
DelegatingIntroductionInterceptor.doProceed(MethodInvocation) line: 133
DelegatingIntroductionInterceptor.invoke(MethodInvocation) line: 121
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 179
CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 655
MyWriter$$EnhancerBySpringCGLIB$$e345242b.close() line: not available
CompositeItemStream.close() line: 85
TaskletStep.close(ExecutionContext) line: 305
TaskletStep(AbstractStep).execute(StepExecution) line: 271
--> SimpleStepHandler.handleStep(Step, JobExecution) line: 148
SimpleJob(AbstractJob).handleStep(Step, JobExecution) line: 392
SimpleJob.doExecute(JobExecution) line: 135
SimpleJob(AbstractJob).execute(JobExecution) line: 306
SimpleJobLauncher$1.run() line: 135
SyncTaskExecutor.execute(Runnable) line: 50
SimpleJobLauncher.run(Job, JobParameters) line: 128
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
AopUtils.invokeJoinpointUsingReflection(Object, Method, Object[]) line: 302
ReflectiveMethodInvocation.invokeJoinpoint() line: 190
ReflectiveMethodInvocation.proceed() line: 157
SimpleBatchConfiguration$PassthruAdvice.invoke(MethodInvocation) line: 127
ReflectiveMethodInvocation.proceed() line: 179
JdkDynamicAopProxy.invoke(Object, Method, Object[]) line: 208
$Proxy105.run(Job, JobParameters) line: not available
MyMQListener$$EnhancerBySpringCGLIB$$28277c0e(MyMQListener).receiveMessage(TextMessage) line: 74
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
InvocableHandlerMethod.doInvoke(Object...) line: 198
InvocableHandlerMethod.invoke(Message<?>, Object...) line: 116
MessagingMessageListenerAdapter.invokeHandler(Message, Session, Message<?>) line: 90
MessagingMessageListenerAdapter.onMessage(Message, Session) line: 66
DefaultMessageListenerContainer(AbstractMessageListenerContainer).doInvokeListener(SessionAwareMessageListener, Session, Message) line: 721
DefaultMessageListenerContainer(AbstractMessageListenerContainer).invokeListener(Session, Message) line: 681
DefaultMessageListenerContainer(AbstractMessageListenerContainer).doExecuteListener(Session, Message) line: 651
DefaultMessageListenerContainer(AbstractPollingMessageListenerContainer).doReceiveAndExecute(Object, Session, MessageConsumer, TransactionStatus) line: 315
DefaultMessageListenerContainer(AbstractPollingMessageListenerContainer).receiveAndExecute(Object, Session, MessageConsumer) line: 253
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener() line: 1150
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop() line: 1142
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 1039
Thread.run() line: 745

Thread [DefaultMessageListenerContainer-2] (Suspended (breakpoint at line 162 in MyWriter)) 
MyWriter.close() line: 162
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
DisposableBeanAdapter.invokeCustomDestroyMethod(Method) line: 354
DisposableBeanAdapter.destroy() line: 277
DisposableBeanAdapter.run() line: 236
StepContext.close() line: 213
StepSynchronizationManager$1.close(StepContext) line: 53
StepSynchronizationManager$1.close(Object) line: 36
StepSynchronizationManager$1(SynchronizationManagerSupport<E,C>).release() line: 190
StepSynchronizationManager.release() line: 112
TaskletStep(AbstractStep).doExecutionRelease() line: 290
TaskletStep(AbstractStep).execute(StepExecution) line: 278
--> SimpleStepHandler.handleStep(Step, JobExecution) line: 148
SimpleJob(AbstractJob).handleStep(Step, JobExecution) line: 392
SimpleJob.doExecute(JobExecution) line: 135
SimpleJob(AbstractJob).execute(JobExecution) line: 306
SimpleJobLauncher$1.run() line: 135
SyncTaskExecutor.execute(Runnable) line: 50
SimpleJobLauncher.run(Job, JobParameters) line: 128
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
AopUtils.invokeJoinpointUsingReflection(Object, Method, Object[]) line: 302
ReflectiveMethodInvocation.invokeJoinpoint() line: 190
ReflectiveMethodInvocation.proceed() line: 157
SimpleBatchConfiguration$PassthruAdvice.invoke(MethodInvocation) line: 127
ReflectiveMethodInvocation.proceed() line: 179
JdkDynamicAopProxy.invoke(Object, Method, Object[]) line: 208
$Proxy105.run(Job, JobParameters) line: not available
MyMQListener$$EnhancerBySpringCGLIB$$28277c0e(MyMQListener).receiveMessage(TextMessage) line: 74
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
InvocableHandlerMethod.doInvoke(Object...) line: 198
InvocableHandlerMethod.invoke(Message<?>, Object...) line: 116
MessagingMessageListenerAdapter.invokeHandler(Message, Session, Message<?>) line: 90
MessagingMessageListenerAdapter.onMessage(Message, Session) line: 66
DefaultMessageListenerContainer(AbstractMessageListenerContainer).doInvokeListener(SessionAwareMessageListener, Session, Message) line: 721
DefaultMessageListenerContainer(AbstractMessageListenerContainer).invokeListener(Session, Message) line: 681
DefaultMessageListenerContainer(AbstractMessageListenerContainer).doExecuteListener(Session, Message) line: 651
DefaultMessageListenerContainer(AbstractPollingMessageListenerContainer).doReceiveAndExecute(Object, Session, MessageConsumer, TransactionStatus) line: 315
DefaultMessageListenerContainer(AbstractPollingMessageListenerContainer).receiveAndExecute(Object, Session, MessageConsumer) line: 253
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener() line: 1150
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop() line: 1142
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 1039
Thread.run() line: 745

堆栈中的最后一个公共(public)行是 SimpleStepHandler.handleStep(Step, JobExecution) line: 148,用箭头标记。

我目前有一个特殊的类级 boolean 变量,它的存在只是为了确保 close 方法的内部只运行一次,但这对我来说似乎不太正确。

读取或写入没有异常或问题。这不是由于“跳过”或“重新启动”功能造成的。

如何防止 ItemWriter.close() 方法被调用两次?

最佳答案

由于这个错误,我非常头疼。

Spring 在使用 Java 配置时尝试自动推断 destroyMethod(但在使用 XML 配置时不会这样做)。要禁用此自动推理,请使用:

 @Bean(destroyMethod="")

现在使用这个注释Spring停止调用我的方法两次,它只会由 Spring 批处理调用一次。

关于java - Spring Batch 自定义编写器 close 方法被调用两次(需要特殊处理),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35402616/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com