gpt4 book ai didi

sql - 作业队列作为具有多个消费者的 SQL 表 (PostgreSQL)

转载 作者:行者123 更新时间:2023-11-29 11:06:56 26 4
gpt4 key购买 nike

我有一个典型的生产者-消费者问题:

多个生产者应用程序将作业请求写入 PostgreSQL 数据库上的作业表。

作业请求有一个状态字段,该字段在创建时开始包含 QUEUED。

当生产者插入新记录时,有多个消费者应用程序被规则通知:

CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";

他们将尝试通过将其状态设置为 RESERVED 来保留新记录。当然,只有一个消费者应该成功。所有其他消费者不应该能够保留相同的记录。他们应该保留 state=QUEUED 的其他记录。

例子:一些生产者将以下记录添加到表jobrecord:

id state  owner  payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>

现在,两个消费者 A, B 想要处理它们。他们同时开始运行。一个应该保留 id 1,另一个应该保留 id 2,然后第一个完成的应该保留 id 3,依此类推。

在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可能在不同机器上运行的不同进程。它们只访问同一个数据库,因此所有同步都必须通过数据库进行。

我阅读了很多关于 PostgreSQL 中并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Select unlocked row in Postgresql PostgreSQL and locking

从这些主题中,我了解到以下 SQL 语句应该可以满足我的需要:

UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord WHERE state = :queued
ORDER BY id LIMIT 1
)
RETURNING id; // will only return an id when they reserved it successfully

不幸的是,当我在多个消费者进程中运行它时,在大约 50% 的时间里,它们仍然保留相同的记录,既处理它又覆盖另一个的更改。

我错过了什么? SQL语句要怎么写才能让多个消费者不保留同一条记录?

最佳答案

我也将 postgres 用于 FIFO 队列。我最初使用 ACCESS EXCLUSIVE,它在高并发时产生正确的结果,但不幸的是与 pg_dump 互斥,它在执行期间获取 ACCESS SHARE 锁。这会导致我的 next() 函数锁定很长时间(pg_dump 的持续时间)。这是 Not Acceptable ,因为我们是一家 24x7 商店,客户不喜欢半夜排队等候的时间。

我认为必须有一个限制较少的锁,它仍然是并发安全的,并且在 pg_dump 运行时不会锁定。我的搜索让我找到了这篇 SO 帖子。

然后我做了一些研究。

以下模式足以满足 FIFO 队列 NEXT() 函数的要求,它将作业的状态从queued 更新为running 而不会出现任何并发失败,也不会阻止 pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

查询:

begin;
lock table tx_test_queue in exclusive mode;
update
tx_test_queue
set
status='running'
where
job_id in (
select
job_id
from
tx_test_queue
where
status='queued'
order by
job_id asc
limit 1
)
returning job_id;
commit;

结果如下:

UPDATE 1
job_id
--------
98
(1 row)

这是一个 shell 脚本,它在高并发 (30) 下测试所有不同的锁定模式。

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock FAIL
# accessShare FAIL
# rowShare FAIL
# rowExclusive FAIL
# shareUpdateExclusive SUCCESS
# share FAIL+DEADLOCKS
# shareRowExclusive SUCCESS
# exclusive SUCCESS
# accessExclusive SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
"noLock") strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessShare") strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowShare") strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowExclusive") strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"share") strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareRowExclusive") strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"exclusive") strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessExclusive") strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
*) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

代码也在这里,如果你想编辑:https://gist.github.com/1083936

我正在更新我的应用程序以使用 EXCLUSIVE 模式,因为它是 a) 正确且 b) 不与 pg_dump 冲突的最严格的模式。我选择了最严格的限制,因为就从 ACCESS EXCLUSIVE 更改应用程序而无需成为 postgres 锁定方面的 super 专家而言,它似乎风险最小。

我对我的测试装置和答案背后的总体思路感到非常满意。我希望分享这个可以帮助其他人解决这个问题。

关于sql - 作业队列作为具有多个消费者的 SQL 表 (PostgreSQL),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6507475/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com