gpt4 book ai didi

grails - 无法找出StaleObjectStateException的原因

转载 作者:行者123 更新时间:2023-12-02 15:57:25 25 4
gpt4 key购买 nike

我很难弄清楚我不断看到的原因:

`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`

我有一个使用Quartz Scheduler触发作业的服务,在我的上下文中,这些作业称为 Flows,每个流程可能由几个 Tasks组成,流程和任务为 Executables,有关其实际 Executions的信息存储为 FlowExecutionsTaskExecutions。该服务使用 FlowService启动流。

UPD:有一个Quartz Job,“ExecutorJob”负责触发我的流程/任务。触发后,它将使用FlowService启动它应该执行的操作。所以我想知道 quartz 线程是否有可能在每次使用服务时都没有创建新的休眠 session ,这就是问题的原因。我没有更改FlowService的范围,因此它是一个单例,GORM如何管理它使用的 session ?

UPD2:在ExecutorJob上使用persistenceContextInterceptor进行了尝试,以确保每次使用该服务都使用一个新 session ,但是并没有解决问题。添加了ExecutorJob的简化代码。

我无法在本地重现该问题,但是它在生产中经常发生,尤其是在有大量启动流程时。
我已经尝试同步任务和流程的 execute方法,但是没有用,我现在尝试使用悲观锁,但是我猜这不能解决问题,因为检查应用程序日志似乎没有问题。两个线程更新同一行。接下来,我尝试显示模仿项目结构的简化代码。
// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
static hasMany = [flowTasks: FlowTask]
static transients = ['executions']

List<Execution> getExecutions() {
this.id ? Execution.findAllByExecutable(this) : []
}

void addToExecutions(Execution execution) {
execution.executable = this
execution.save()
}

abstract List<Execution> execute(Map params)
}

class Flow extends Executable {
SortedSet<FlowTask> tasks
static hasMany = [tasks: FlowTask]

private static final Object lockExecute = new Object()
private static final Object lockExecuteTask = new Object()

List<FlowExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiParams = multiplyParams(params)
multiParams.collect { Map param ->
FlowExecution flowExecution = new FlowExecution()
addToExecutions(flowExecution)
flowExecution.save()
this.attach()
save()
executeTasks(firstTasks(param), flowExecution, param)
}
}
}

List<Map> multiplyParams(Map params) {
// creates a list of params for the executions that must be started
[params]
}

Set<FlowTask> firstTasks(Map params) {
// finds the first tasks to be executed for the flow
tasks.findAdll { true }
}

private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
synchronized (lockExecuteTask) {
tasks.each { FlowTask flowTask ->
try {
List<Execution> executions = flowTask.execute(params)
executions.each { Execution execution ->
flowExecution.addToExecutions(execution)
}
flowExecution.attach()
} catch {
// log error executing task
throw e
}
}

this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving flow
throw e
}

flowExecution
}
}

}

class Task extends Executable {
private static final Object lockExecute = new Object()
private static final Object lockGetExecution = new Object()

TaskExecution execute(TaskExecution execution) {
taskService.start(execution)
execution
}

List<TaskExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiExecParams = multiplyParams(params)
multiExecParams.collect { Map param ->
TaskExecution execution = getExecution(param)
execute(execution)
}
}
}

TaskExecution getExecution(Map params) {
synchronized (lockGetExecution) {
TaskExecution execution = new TaskExecution(executable: this)
execution.setParameters(params)
addToExecutions(execution)

execution.attach()
execution.flowExecution?.attach()
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving task
throw e
}

execution
}
}

List<Map> multiplyParams(Map params) {
// creates a list of params for the tasks that must be started
[params]
}

}

class FlowTask {
static belongsTo = [flow: Flow, executable: Executable]

List<Execution> execute(Map params) {
executable.execute(params)
}
}

abstract class Execution {
Map parameterData = [:]
static belongsTo = [executable: Executable, flowExecution: FlowExecution]
static transients = ['parameters', 'taskExecutions']
void setParameters(Map params) {
params.each { key, value ->
parameterData[key] = JsonParser.toJson(value)
}
}
}

class TaskExecution extends Execution {
}

class FlowExecution extends Execution {
List<Execution> executions
static transients = ['executions']

FlowExecution() {
executions = []
}

Set<TaskExecution> getTaskExecutions() {
executions?.collect { Execution execution ->
return execution.taskExecution
}?.flatten()?.toSet()
}

void addToExecutions(Execution execution){
executions.add(execution)
execution.flowExecution = this
execution.save()
}

def onLoad() {
try {
executions = this.id ? Execution.findAllByFlowExecution(this) : []
} catch (Exception e){
log.error(e)
[]
}
}
}

// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {

Map start(long flowId, Map params) {
Flow flow = Flow.lock(flowId)

startFlow(flow, params)
}

private Map startFlow(Flow flow, Map params) {
List<RunningFlow> runningFlows = flow.execute(params)

[data: [success: true], status: HTTP_OK]
}
}

//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {

def grailsApplication = Holders.getGrailsApplication()

static triggers = {}

private Thread thread

void execute(JobExecutionContext context) throws JobExecutionException {
thread = Thread.currentThread()
synchronized (LockContainer.taskLock) {
Map params = context.mergedJobDataMap
def persistenceInterceptor = persistenceInterceptorInstance

try {
persistenceInterceptor.init()

Long executableId = params.executableId as Long

def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
service.start(executableId, params)
} catch (Exception e) {
// log error
} finally {
persistenceInterceptor.flush()
persistenceInterceptor.destroy()
}
}
}

PersistenceContextInterceptor getPersistenceInterceptorInstance() {
grailsApplication.mainContext.getBean('persistenceInterceptor')
}

FluxoService getFlowServiceInstance() {
grailsApplication.mainContext.getBean('flowService')
}

TarefaService getTaskServiceInstance() {
grailsApplication.mainContext.getBean('taskService')
}

@Override
void interrupt() throws UnableToInterruptJobException {
thread?.interrupt()
}
}

有人知道可能会有帮助吗?

最佳答案

好吧,很难理解出了什么问题。但是,我猜想当 session 中的某个对象已被其他事务保存或更新时,会引发此错误。同样,当休眠模式尝试保存该对象时,它会给出行被另一个事务错误更新的错误。

我猜您可以在保存对象并进行查看之前尝试刷新。

http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html

def b = Book.get(1)

b.refresh()

关于grails - 无法找出StaleObjectStateException的原因,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32787918/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com