gpt4 book ai didi

laravel - 如何在 Laravel 中收听 Postgres 监听/通知?

转载 作者:行者123 更新时间:2023-12-04 17:29:08 27 4
gpt4 key购买 nike

我的任务一般
我需要听一个 Postgres 表更改(CRUD),例如通过像 DBeaver 这样的数据库管理器,并将更新的行 ID 传递给 Laravel 驱动的 API 端点。
我有的
Postgres 部分
在 Postgres 中,我创建了一个表、一个表的触发器和一个在 postgres 端处理事件的函数

CREATE TABLE PUBLIC.TBLEXAMPLE
(
KEY1 CHARACTER VARYING(10) NOT NULL,
KEY2 CHARACTER VARYING(14) NOT NULL,
VALUE1 CHARACTER VARYING(20),
VALUE2 CHARACTER VARYING(20) NOT NULL,
CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
);

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
PERFORM pg_notify('myevent', row_to_json(NEW)::text);
RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;


CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER insert or update or delete
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();
PHP部分
我有一个基本的 PHP 脚本,打算从 CLI 运行。当我运行它时,我会在 PG 表中收到有关更新的通知
<?php
$db = new PDO(
"pgsql:dbname=database host=localhost port=5432", 'postgres', 'password', [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
]
);

$db->exec('LISTEN myevent');
echo 'Starting';

while(true) {
while ($result = $db->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000)) {
echo print_r($result, true) . PHP_EOL;
}
}

这是它的外观
enter image description here
问题
将上面的 PHP 脚本作为 Laravel 部分运行的正确方法是什么?
请指出我要阅读的内容,也许是类似的解决方案。

I know clever words like "worker", "queue", I use php artisan queue:work in my API (A user requests an endpoint which adds jobs to the queue). But in this case the role of the user should be performed by the php script logic above.

My suggestion. I probably must develop something like php artisan listen2posrgres with the logic from above and run it similar to php artisan queue:work throughout supervisor. Can this work?

最佳答案

正如一些评论指出的那样,您可以为此编写一个自定义的 Artisan 命令。运行该命令开始“监听”触发事件。
在 PostgreSQL 中,你可以创建一个触发器,它就像 Laravel 中的订阅者一样——观察表的变化,它收集诸如表名和 Action 之类的数据,并通过 pg_notify 将其传递给 Laravel。如果 Laravel 已经定义了到这个数据库的连接。您可以使用 JSON 处理对要传递给 Laravel 的数据进行编码,然后将其解析为编码为字符串的 JSON。
下面是一个简单的例子,用于观察订单或用户表的 UPDATE、INSERT 或 DELETE。
触发器

create function notify_event() returns trigger
language plpgsql
as
$$
DECLARE
notification json;
BEGIN

-- PostgreSQL auto-defined variables:
-- TG_OP ~ action such as INSERT, DELETE, UPDATE
-- TG_TABLE_NAME

-- Contruct the notification as a JSON string.
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP);


-- Execute pg_notify(channel, notification)
PERFORM pg_notify('events', notification::text);

-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;

$$;

alter function notify_event() owner to YOUR_DATABASE_NAME_HERE;
工匠指挥部
<?php

namespace App\Console\Commands;

use App\Events\OrderCreated;
use App\Events\OrderDeleted;
use App\Events\OrderUpdated;
use App\Events\UserCreated;
use App\Events\UserDeleted;
use App\Events\UserUpdated;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

/**
* Class SubscribeToTriggers
*
* @package App\Console\Commands
*/
class SubscribeToTriggers extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'psql:subscribe-to-triggers {--t|table=* : Tables to synchronize.}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Listen for changes on database and update the platform accordingly';

/**
* Tables to synchronize.
*
* @var array
*/
protected $tables;

/**
* @var
*/
private $subscribers;

/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();

$this->tables = [];
$this->subscribe();
}

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$timeout = (int) $this->option('timeout');

if ($table = $this->option('table')) {
if (is_array($table)) {
$this->tables = $table;
} else {
$this->tables[] = $table;
}
}

try {
$dbo = DB::connection('DATABASE_NAME_HERE')->getPdo();
$dbo->exec('LISTEN "events"');
while (true) {
$event = $dbo->pgsqlGetNotify(\PDO::FETCH_ASSOC, $timeout * 1000);

if ($this->output->isDebug()) {
$this->getOutput()->write($event);
$this->getOutput()->write(PHP_EOL);
}

$payload = json_decode($event['payload']);
$table = $payload->table;
$action = $payload->action;
$original = $payload->original;
$data = $payload->data;

$observer = null;
$subject = implode('@', [$table, strtolower($action)]);
if (array_key_exists($subject, $this->subscribers)) {
$observer = $this->subscribers[$subject];
} else if (array_key_exists($table, $this->subscribers)) {
$observer = $this->subscribers[$table];
}
if (isset($observer) && method_exists($this, $observer->handler)) {
$handler = $observer->handler;
$this->$handler($data, $action, $original);
}

}
} catch (Exception $e) {
logger($e->getMessage());
}
}

/**
* Set up observers to handle events on a table.
*
* @param $entity
* @param $handler
*/
private function listen($entity, $handler)
{
if (!isset($this->subscribers)) {
$this->subscribers = [];
}

$info = explode('@', $entity);
$table = $info[0];
$action = count($info) > 1 ? $info[1] : null;

$observer = new \stdClass();
$observer->table = $table;
$observer->action = $action;
$observer->handler = $handler;
$subject = !empty($action) ? implode('@', [$table, strtolower($action)]) : $table;
$this->subscribers[$subject] = $observer;
}

/**
* Subscribe to modification events on these tables.
*/
private function subscribe()
{
$this->listen('orders_table', 'onOrder');
$this->listen('users_table', 'onUser');
}

/**
* @param $order
* @param null $action
* @param null $original
*/
protected function onOrder($order, $action = null, $original = null)
{
$event = null;

if ($action == 'INSERT') {
$event = new OrderCreated();
} else if ($action === 'UPDATE') {
$event = new OrderUpdated();
} else if ($action == 'DELETE') {
$event = new OrderDeleted();
}

if (!is_null($event)) {
event($event);
}
}

/**
* @param $user
* @param null $action
* @param null $original
*/
protected function onUser($user, $action = null, $original = null)
{
$event = null;

if ($action == 'INSERT') {
$event = new UserCreated();
} else if ($action === 'UPDATE') {
$event = new UserUpdated();
} else if ($action == 'DELETE') {
$event = new UserDeleted();
}

if (!is_null($event)) {
event($event);
}
}

}
然后你将编写一个 Laravel 订阅者来为命令中定义的每个事件定义 EventListeners:
  • 订单创建/订单更新/订单删除
  • 用户创建/用户更新/用户删除

  • 并确保在 EventServiceProvider 的 $subscribers 块中注册此订阅者。
    引用文献
    PostgreSQL Triggers
    PostgreSQL JSON and Functions
    Laravel Events: Subscribers

    关于laravel - 如何在 Laravel 中收听 Postgres 监听/通知?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61263632/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com