gpt4 book ai didi

multithreading - 并行事务的 postgresql 插入规则

转载 作者:行者123 更新时间:2023-11-29 12:13:32 26 4
gpt4 key购买 nike

我们有一个多线程应用程序使用的 postgreql 连接池,它永久地将一些记录插入到大表中。因此,假设我们有 10 个数据库连接,执行相同的功能,插入记录。

麻烦的是,结果我们插入了10条记录,同时应该只插入了2-3条记录,如果只有交易可以看到彼此的记录(我们的函数决定不插入记录根据最后一条记录的日期)。

我们无法承受 func 执行期间的表锁定。我们尝试了不同的技术来使数据库立即将我们的规则应用于新记录,尽管它们是在并行事务中创建的,但尚未成功。

因此,如果有任何帮助或想法,我将不胜感激!

更具体地说,这是代码:

schm.events ( evtime TIMESTAMP, ref_id INTEGER, param INTEGER, type INTEGER);

记录过滤规则:

BEGIN
select count(*) into nCnt
from events e
where e.ref_id = ref_id and e.param = param and e.type = type
and e.evtime between (evtime - interval '10 seconds') and (evtime + interval '10 seconds')

if nCnt = 0 then
insert into schm.events values (evtime, ref_id, param, type);
end if;
END;

更新(不幸的是评论长度不够)

我已将唯一索引解决方案应用于生产。结果还算可以,但没有达到最初的目标。问题是,对于唯一的哈希值,我无法控制具有顺序哈希码的 2 条记录之间的间隔。

代码如下:

CREATE TABLE schm.events_hash (
hash_code bigint NOT NULL
);
CREATE UNIQUE INDEX ui_events_hash_hash_code ON its.events_hash
USING btree (hash_code);


--generate the hash codes data by partioning(splitting) evtime in 10 sec intervals:
INSERT into schm.events_hash
select distinct ( cast( trunc( extract(epoch from evtime) / 10 ) || cast( ref_id as TEXT) || cast( type as TEXT ) || cast( param as TEXT ) as bigint) )
from schm.events;

--and then in a concurrently executed function I insert sequentially:
begin
INSERT into schm.events_hash values ( cast( trunc( extract(epoch from evtime) / 10 ) || cast( ref_id as TEXT) || cast( type as TEXT ) || cast( param as TEXT ) as bigint) );
insert into schm.events values (evtime, ref_id, param, type);
end;

在这种情况下,如果 evtime 位于哈希确定的区间内,则只会插入一条记录。情况是,我们可以跳过引用不同确定间隔但彼此接近(小于 60 秒间隔)的记录。

insert into schm.events values ( '2013-07-22 19:32:37', '123', '10', '20' ); --inserted, test ok, (trunc( extract(epoch from cast('2013-07-22 19:32:37' as timestamp)) / 10 ) = 137450715 )
insert into schm.events values ( '2013-07-22 19:32:39', '123', '10', '20' ); --filtered out, test ok, (trunc( extract(epoch from cast('2013-07-22 19:32:39' as timestamp)) / 10 ) = 137450715 )
insert into schm.events values ( '2013-07-22 19:32:41', '123', '10', '20' ); --inserted, test fail, (trunc( extract(epoch from cast('2013-07-22 19:32:41' as timestamp)) / 10 ) = 137450716 )

我认为一定有一种方法可以修改哈希函数来达到最初的目标,但还没有找到。也许,有一些表约束表达式,由postgresql本身在事务之外执行?

最佳答案

关于您唯一的选择是:

  • 使用唯一索引和 hack 将 20 秒范围折叠为单个值;

  • 使用 advisory locking控制通信;或者

  • SERIALIZABLE 隔离并有意在 session 之间创建相互依赖关系。不能 100% 确定这对您的情况是否可行。

您真正想要的是脏读,但 PostgreSQL 不支持脏读,所以您有点卡在那里。

您可能最终需要数据库外部的协调员来管理您的需求。

唯一索引

您可以为了唯一性检查而截断时间戳,将它们四舍五入到常规边界,以便它们以 20 秒为单位跳转。然后将它们添加到 (chunk_time_seconds(evtime, 20), ref_id, param, type) 上的唯一索引。

只有一个插入会成功,其余的将因错误而失败。您可以在 PL/PgSQL 的 BEGIN ... EXCEPTION block 中捕获错误,或者最好只在应用程序中处理它。

我认为 chunk_time_seconds 的合理定义可能是:

CREATE OR REPLACE FUNCTION chunk_time_seconds(t timestamptz, round_seconds integer)
RETURNS bigint
AS $$
SELECT floor(extract(epoch from t) / 20) * 20;
$$ LANGUAGE sql IMMUTABLE;

建议锁定的起点:

可以对单个 bigint 或一对 32 位整数进行咨询锁。你的 key 比那个大,它是三个整数,所以你不能直接使用最简单的方法:

IF pg_try_advisory_lock(ref_id, param) THEN
... do insert ...
END IF;

然后 10 秒后,在同一连接上(但不一定在同一事务中)发出 pg_advisory_unlock(ref_id_param)

它不会工作,因为您还必须根据 type 进行过滤,并且没有 pg_advisory_lock 的三整数参数形式。如果您可以将 paramtype 转换为 smallints,您可以:

IF pg_try_advisory_lock(ref_id, param << 16 + type) THEN

但除此之外,您会遇到麻烦。当然,您可以对这些值进行散列处理,但是这样会冒(小)错误地跳过在发生散列冲突的情况下不应跳过的插入内容的风险。无法触发重新检查,因为冲突的行不可见,因此您不能使用仅比较行的通常解决方案。

所以...如果您可以将 key 放入 64 位,并且您的应用程序可以处理在同一连接中释放之前持有锁 10-20 秒的需要,建议锁将对您有用并且非常有用低开销。

关于multithreading - 并行事务的 postgresql 插入规则,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18062419/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com