- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个典型的生产者-消费者问题:
多个生产者应用程序将作业请求写入 PostgreSQL 数据库上的作业表。
作业请求有一个状态字段,该字段在创建时开始包含 QUEUED。
当生产者插入新记录时,有多个消费者应用程序被规则通知:
CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";
他们将尝试通过将其状态设置为 RESERVED 来保留新记录。当然,只有一个消费者应该成功。所有其他消费者不应该能够保留相同的记录。他们应该保留 state=QUEUED 的其他记录。
例子:一些生产者将以下记录添加到表jobrecord:
id state owner payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>
现在,两个消费者 A, B 想要处理它们。他们同时开始运行。一个应该保留 id 1,另一个应该保留 id 2,然后第一个完成的应该保留 id 3,依此类推。
在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可能在不同机器上运行的不同进程。它们只访问同一个数据库,因此所有同步都必须通过数据库进行。
我阅读了很多关于 PostgreSQL 中并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Select unlocked row in Postgresql PostgreSQL and locking
从这些主题中,我了解到以下 SQL 语句应该可以满足我的需要:
UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord WHERE state = :queued
ORDER BY id LIMIT 1
)
RETURNING id; // will only return an id when they reserved it successfully
不幸的是,当我在多个消费者进程中运行它时,在大约 50% 的时间里,它们仍然保留相同的记录,既处理它又覆盖另一个的更改。
我错过了什么? SQL语句要怎么写才能让多个消费者不保留同一条记录?
最佳答案
我也将 postgres 用于 FIFO 队列。我最初使用 ACCESS EXCLUSIVE,它在高并发时产生正确的结果,但不幸的是与 pg_dump 互斥,它在执行期间获取 ACCESS SHARE 锁。这会导致我的 next() 函数锁定很长时间(pg_dump 的持续时间)。这是 Not Acceptable ,因为我们是一家 24x7 商店,客户不喜欢半夜排队等候的时间。
我认为必须有一个限制较少的锁,它仍然是并发安全的,并且在 pg_dump 运行时不会锁定。我的搜索让我找到了这篇 SO 帖子。
然后我做了一些研究。
以下模式足以满足 FIFO 队列 NEXT() 函数的要求,它将作业的状态从queued 更新为running 而不会出现任何并发失败,也不会阻止 pg_dump:
SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE
查询:
begin;
lock table tx_test_queue in exclusive mode;
update
tx_test_queue
set
status='running'
where
job_id in (
select
job_id
from
tx_test_queue
where
status='queued'
order by
job_id asc
limit 1
)
returning job_id;
commit;
结果如下:
UPDATE 1
job_id
--------
98
(1 row)
这是一个 shell 脚本,它在高并发 (30) 下测试所有不同的锁定模式。
#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock FAIL
# accessShare FAIL
# rowShare FAIL
# rowExclusive FAIL
# shareUpdateExclusive SUCCESS
# share FAIL+DEADLOCKS
# shareRowExclusive SUCCESS
# exclusive SUCCESS
# accessExclusive SUCCESS, but LOCKS against pg_dump
#config
strategy="exclusive"
db=postgres
dbuser=postgres
queuecount=100
concurrency=30
# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &
echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
"noLock") strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessShare") strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowShare") strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowExclusive") strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"share") strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareRowExclusive") strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"exclusive") strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessExclusive") strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
*) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
代码也在这里,如果你想编辑:https://gist.github.com/1083936
我正在更新我的应用程序以使用 EXCLUSIVE 模式,因为它是 a) 正确且 b) 不与 pg_dump 冲突的最严格的模式。我选择了最严格的限制,因为就从 ACCESS EXCLUSIVE 更改应用程序而无需成为 postgres 锁定方面的 super 专家而言,它似乎风险最小。
我对我的测试装置和答案背后的总体思路感到非常满意。我希望分享这个可以帮助其他人解决这个问题。
关于sql - 作业队列作为具有多个消费者的 SQL 表 (PostgreSQL),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6507475/
我的 postgresql 有问题,我复制了所有文件,然后将其删除。然后,我安装了新的,问题就解决了。现在可以将旧文件和文件导入新文件吗? 最佳答案 如果它们是相同的主要版本(即 9.0 到 9.0.
我想使用 Postgresql 9.2.2 来存储我的应用程序的数据。我不得不构建一个应该基于数据库级别的触发器(当数据库启动时,这个触发器将被触发并执行。),当 postgresql 服务器启动时是
我已经使用下面的查询从 Postgresql 目录表中获取 Sequence 对象的完整信息 select s.sequence_name, s.start_value, s.minimum_valu
Postgres 版本:9.3.4 我需要执行驻留在远程数据库中的函数。该函数根据给定的参数返回一个统计数据表。 我实际上只是在我的本地数据库中镜像该函数,以使用我的数据库角色和授权来锁定对该函数的访
我在 CentOS 7 上,我正在尝试解决“PG::ConnectionBad: FATAL: Peer authentication failed for user”错误。 所以我已经想出我应该更改
我写了一个触发器函数,在触发器表列名上循环,我从具有不同列的不同表调用该函数。该函数将列名插入到数组中并在它们上循环,以便将值插入到另一个模式和表中。 函数和触发器创建脚本: DROP TRIGGER
PostgreSQL 的默认空闲连接超时是多少,我运行了 show idle_in_transaction_session_timeout 查询并返回了 0,但是值 0 表示此选项被禁用,但我想知道默
我需要将十六进制值存储到数据库表中,谁能推荐我需要用于属性的数据类型? 提前致谢 最佳答案 您可以使用bytea 来存储十六进制格式。更多信息 can be found in the postgres
我有一个具有复合主键的(大)表,由 5 列(a、b、c、d、e)组成。 我想高效地选择具有其中两列 (a + e) 的所有行到给定值。 在 PostgreSQL 中,我需要索引吗?或者数据库会使用主键
在阅读 PostreSQL (13) 文档时,我遇到了 this页面,其中列出了不同日期时间类型的存储大小。 除其他外,它指出: Name Storag
我有两个大整数的巨大表(500 000 000 行)。两列都被单独索引。我正在使用语法批量插入此表: INSERT into table (col1, col2) VALUES(x0, y0), (x
有一台 CentOS7 Linux 机器正在运行(不是由我管理;拥有有限的权限)。 请求在其中设置 PostgreSQL。 刚刚从 CentOS 存储库安装了 PostgreSQL: sudo yum
我在 Ubuntu 18.04 上安装了 Postgresql 10,但不知何故坏了,不会重新启动。我可以重新安装它而不破坏它的数据库,以便我可以再次访问数据库吗? pg_dump 不起作用。 最佳答
我想在 UNIX 中使用 crontab 自动备份 PostgreSQL 数据库。我已经尝试过,但它会创建 0 字节备份。 我的 crontab 条目是: 24 * * * * /home/desk
我已经完成了PG服务器的安装。我希望能够使用 pgAdmin 远程连接到它,但不断收到服务器不听错误。 could not connect to server: Connection refused
Oracle 支持波斯历但需要知道 PostgreSQL 是否支持波斯历? 如果是,那么我们如何在 PostgreSQL 中将默认日历类型设置为 Persian 而不是 Gregorian(在 Ora
假设我们有一个带有表的 SQL 数据库 Person以及访问它的几个应用程序。出于某种原因,我们想修改 Person表以向后不兼容的方式。 保持兼容性的一种潜在解决方案是将表重命名为 User并创建一
我使用 PostgreSQL 中的模式来组织我庞大的会计数据库。每年年底,我都会通过为下一年创建一个新模式来进行协调过程。 新模式的文件是否与旧模式物理分离?或者所有模式一起存储在硬盘上? 这对我来说
我正在尝试使用配置文件中的以下配置参数调整 PostgreSQL 服务器: autovacuum_freeze_max_age = 500000000 autovacuum_max_workers =
我的数据包含数据库列中的表情符号,即 message_text ------- 🙂 😀 Hi 😀 我只想查询包含表情符号的数据的行。在 postgres 中是否有一种简单的方法可以做到这一点?
我是一名优秀的程序员,十分优秀!