gpt4 book ai didi

postgresql - Postgresql 中的意外死锁(使用 psycopg2 时)

转载 作者:行者123 更新时间:2023-11-29 11:47:01 28 4
gpt4 key购买 nike


我正在处理 PostgreSQL 中的一个我不理解的死锁问题。
我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库实现类似 Round Robin 的算法。
我希望应用程序的多个实例执行以下操作:
- 在很短的时间内用任务列表锁定整个表
- 选择要执行的任务(最近最少执行的任务,有一些限制)
- 标记任务,这样其他实例就不会选择它(只允许一个实例同时执行相同的任务)
- 解锁 table
- 执行任务
- 重复
其他 session 也应该能够更新该表的某些字段。
突然间,我遇到了无法解释的僵局。我尽可能地简化了我的 Python 脚本,我在每条语句之后执行一次提交(如果可能的话),但仍然时不时地出现死锁。
出于某种原因,每次遇到死锁时,它都是事务中的第一条语句。这怎么可能?我的表没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 不会在提交后立即释放锁。或者可能是 psycopg2 没有按我预期的方式工作?我未能通过在不同 session 中手动运行语句来重现该问题。
死锁很少见,但我至少每隔几个小时就会遇到一次

我在 PostgreSQL 9.6.1 和 Python 2.7.12 上运行

这是我运行的代码(这只是我为解决问题而制作的简化示例):

import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib


instance_type='scan_master'
instance_id=sys.argv[1]

dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])

def sanitize(string):
string=string.replace("'","''")
return string

def get_task(instance_id):
task_id=None
out_struct={}
instance_id=sanitize(instance_id)
#Lock whole table
dbh.commit() #Just in case
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
#Now get the task
sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
sql+="FROM wf_task t\n"
sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
sql+="WHERE status='A'\n"
sql+="AND t.scanner_instance_id is NULL\n"
sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
sql+="ORDER BY last_scan_ts\n"
sql+="LIMIT 1\n"
cursor.execute(sql)
cnt=cursor.rowcount
if cnt>0:
row=cursor.fetchone()
task_id=row[0]
sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
cursor.execute(sql)
scanner_function=row[1]
parallel_runs=row[2]
out_struct['task_id']=task_id
out_struct['scanner_function']=scanner_function
out_struct['parallel_runs']=parallel_runs
dbh.commit()
return out_struct

def process_task(task_id):
sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()

while True:
if not ovs_lib.check_control(instance_type, instance_id):
now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
print now_time+" Stop sygnal received"
exit(0)
task_struct=get_task(instance_id)
if 'task_id' not in task_struct:
time.sleep(1)
continue
process_task(task_struct['task_id'])

下面是我得到的错误示例:

Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT: See server log for query details.
CONTEXT: while updating tuple (8,12) in relation "wf_task"

Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
task_struct=get_task(instance_id)
File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT: See server log for query details.
CONTEXT: while locking tuple (17,9) in relation “wf_task"

当时我同时运行了 6 个这个脚本的实例数据库中没有其他 session 处于事件状态。

稍后更新
今天我学到了一些关于 Postgres 的新知识,它与这个问题非常相关
从 9.5 版开始,PostgreSQL 支持 SKIP LOCKED 语句,它以非常优雅的方式解决了我试图设计我的应用程序的问题
如果您在尝试实现某种队列或循环解决方案时正在努力解决 PostgreSQL 中的并发问题,那么您绝对必须阅读以下内容:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

最佳答案

问题可能是第一个 SELECT ... FOR UPDATE 中的顺序扫描并不总是以相同的顺序返回行,因此并发执行此语句会锁定不同顺序的表。这会导致您遇到僵局。

有几种解决方案,可以增加善良:

  • 我认为为这次更新锁定整个表的技术对性能来说很糟糕,但如果你坚持保留你的代码,你可以设置 synchronize_seqscans off 以便所有顺序扫描以相同的顺序返回行。但是你真的不应该像你那样锁定表中的所有行,因为

    • 它会导致不必要的顺序扫描。

    • 这不安全。在您锁定行和运行 UPDATE 之间,有人可以INSERT 新行。

  • 如果您真的想锁定整个表,请使用 LOCK TABLE 语句而不是锁定表中的所有行。这也将摆脱僵局。

  • 最好的解决方案可能是使用 UPDATE 本身锁定行。为避免死锁,请检查 PostgreSQL 用于 UPDATE 的执行计划。这将是索引扫描或顺序扫描。使用索引扫描是安全的,因为它会按特定顺序返回行。对于顺序扫描,禁用上面提到的 synchronize_seqscans 功能,理想情况下仅针对事务:

    START TRANSACTION;
    SET LOCAL synchronize_seqscans = off;
    /* your UPDATEs go here */
    COMMIT;

关于postgresql - Postgresql 中的意外死锁(使用 psycopg2 时),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42729849/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com