gpt4 book ai didi

python - Postgres : Is there a way of executing code following a INSERT statement?

转载 作者:行者123 更新时间:2023-11-29 13:02:30 27 4
gpt4 key购买 nike

这可能看起来很奇怪,但我很好奇是否有可能在 postgres 数据库中的 INSERT 语句之后执行代码块?

具体来说,我对在 pg 数据库中出现 INSERT 语句后执行 Python 代码很感兴趣。

最佳答案

有几种方法可以解决这个问题。

听/通知

解决这个问题的简单方法是使用 postgresql notifications .

您可以在插入/更新触发器之后添加通知:

CREATE OR REPLACE FUNCTION on_insert() RETURNS trigger AS
$$
BEGIN

execute E'NOTIFY ENTITY_CHANGE, \'' || NEW.id || E'\'';

RETURN NEW;
END
$$
LANGUAGE 'plpgsql' VOLATILE;

create trigger trig_on_insert
after insert on ENTITY
for each row
execute procedure on_insert_to_t();

ENTITY_CHANGE 是您可以随意使用的 channel 的标识符。

你的申请应该listen在单独的线程(或进程)中执行它并执行所需的操作:

from django.db import connection

curs = connection.cursor()
curs.execute("LISTEN ENTITY_CHANGED;")

while not_finish:
if select.select([connection],[],[],5) == ([],[],[]):
print "Timeout"
else:
connection.poll()
while connection.notifies:
notify = connection.notifies.pop()
entity_id = notify.payload
do_post_save(entity_id)

唯一需要注意的是,通知不是事务性的,如果发生灾难性故障,通知可能会丢失。当您的应用程序收到通知但在完成处理通知之前崩溃(或被杀死)时,这种情况就会发生,这样的通知将永远丢失。

如果您需要保证保存后处理总是发生,您需要维护一些任务表。在插入/更新触发器应该向该表添加任务并且一些 python 进程应该轮询该表并进行所需的处理。缺点是轮询 - 当系统不保存实体时,它会进行不必要的查询。

您可以结合这两种方法来获得最好的方法,即使用通知开始处理,但处理器应该从由触发器填充的任务表中获取任务。在您的应用程序启动过程中,应该运行处理以完成未完成的工作(如果有的话)。

逻辑复制

更好更可靠的方法是使用 logical replication .

此选项使用 transaction log并且消费者确认收到了更改通知,因此不会遗漏任何通知并且可以可靠地交付。

为了证明这一点,我在这里使用 image为逻辑复制预配置并安装了 wal2json WAL解码插件:

docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14

这是一个消费者的例子:

import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection

my_connection = psycopg2.connect(
"dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)

def consume(msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)

cur.consume_stream(consume)

现在执行像 insert into table1 values (1, 'hello') 这样的插入会产生这个:

{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table1",
"columnnames": ["i", "t"],
"columntypes": ["integer", "text"],
"columnvalues": [1, "hello"]
}
]
}

关于python - Postgres : Is there a way of executing code following a INSERT statement?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25215600/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com