gpt4 book ai didi

PostgreSQL - 实现可靠队列

转载 作者:行者123 更新时间:2023-11-29 11:14:15 28 4
gpt4 key购买 nike

我正在尝试使用 postgres 数据库实现具有多个写入器和多个读取器的可靠队列。当队列读取器扫描表然后进行中的事务在读取后提交时,如何避免丢失行。

我们有一个阅读器使用“检查点”时间分批选择行,其中每个批处理获取前一批处理中最后一个时间戳之后的行,而我们缺少行。 (原因:时间戳值基于 INSERT 发生的时间(00.00.00)。在重负载下,如果事务需要更长的时间,它被插入,比如说 10 秒后(00.00.10),读者将错过这一行(row1) 如果它在那 10 秒内读取并找到其插入时间晚于 row1 的行 (00.00.05)。问题的完整描述类似于此博客中的描述。http://blog.thefourthparty.com/stopping-time-in-postgresql/ )

上下文相关的先前问题:Postgres LISTEN/NOTIFY - low latency, realtime?

更新:我已将问题从单一读者更新为多个读者。读者阅读的顺序很重要。

最佳答案

考虑到多个读者,有必要控制每个读者已经收到了哪些记录。

此外,据说订单也是将记录发送给读者的条件。因此,如果在较早的事务之前提交了一些进一步的事务,我们必须“停止”并在它提交时再次发送记录,以维护发送给读取器的记录的顺序。

也就是说,检查实现:

-- lets create our queue table 
drop table if exists queue_records cascade;
create table if not exists queue_records
(
cod serial primary key,
date_posted timestamp default timeofday()::timestamp,
message text
);


-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint
(
reader_id text primary key,
last_checkpoint numeric
);



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
vLAST_CHECKPOINT numeric;
vCHECKPOINT_EXISTS integer;
vRECORD queue_records%rowtype;
BEGIN

-- let's get the last record sent to the reader
SELECT last_checkpoint
INTO vLAST_CHECKPOINT
FROM queue_reader_checkpoint
WHERE reader_id = pREADER_ID;

-- if vLAST_CHECKPOINT is null (this is the very first time of reader_id),
-- sets it to the last cod from queue. It means that reader will get records from now on.
if (vLAST_CHECKPOINT is null) then
-- sets the flag indicating the reader does not have any checkpoint recorded
vCHECKPOINT_EXISTS = 0;
-- gets the very last commited record
SELECT MAX(cod)
INTO vLAST_CHECKPOINT
FROM queue_records;
else
-- sets the flag indicating the reader already have a checkpoint recorded
vCHECKPOINT_EXISTS = 1;
end if;

-- now let's get the records from the queue one-by-one
FOR vRECORD IN
SELECT *
FROM queue_records
WHERE COD > vLAST_CHECKPOINT
ORDER BY COD
LOOP

-- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then

-- let's save the last record read
vLAST_CHECKPOINT = vRECORD.COD;

-- and return it
RETURN NEXT vRECORD;

-- but, if it is not, then is out of order
else
-- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
-- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
exit;
end if;
END LOOP;


-- now we have to persist the last record read to be retrieved on next call
if (vCHECKPOINT_EXISTS = 0) then
INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
else
UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
end if;
end;
$BODY$ LANGUAGE plpgsql VOLATILE;

关于PostgreSQL - 实现可靠队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32946852/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com