如何在 Laravel 中收听 Postgres listen/notify?

How to listen to Postgres listen/notify in Laravel?

我的总体任务

我需要听取 Postgres table 的更改 (CRUD),例如通过像 DBeaver 这样的数据库管理器并将更新的行 ID 传递给 laravel 驱动的 API enpoint.

我有什么

Postgres 部分

在 Postgres 中,我创建了一个 table,一个触发器 table 和一个在 postgres 端处理事件的函数

CREATE TABLE PUBLIC.TBLEXAMPLE
(
  KEY1 CHARACTER VARYING(10) NOT NULL,
  KEY2 CHARACTER VARYING(14) NOT NULL,
   VALUE1 CHARACTER VARYING(20),
  VALUE2 CHARACTER VARYING(20) NOT NULL,
   CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
);

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
  PERFORM pg_notify('myevent', row_to_json(NEW)::text);
  RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;


CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER insert or update or delete 
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();

PHP部分

我有一个基本的 PHP 脚本,它是 CLI 中的 运行。当我 运行 它时,我会在 PG table

中收到更新通知
<?php
$db = new PDO(
    "pgsql:dbname=database host=localhost port=5432", 'postgres', 'password', [
        PDO::ATTR_ERRMODE            => PDO::ERRMODE_EXCEPTION,
        PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
    ]
);

$db->exec('LISTEN myevent');
echo 'Starting';

while(true) {
    while ($result = $db->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000)) {
        echo print_r($result, true) . PHP_EOL;
    }
}

这是它的样子

问题

运行 将上述 PHP 脚本作为 laravel 部分的正确方法是什么?

请指出要阅读的内容,也许是类似的解决方案。

I know clever words like "worker", "queue", I use php artisan queue:work in my API (A user requests an endpoint which adds jobs to the queue). But in this case the role of the user should be performed by the php script logic above.

My suggestion. I probably must develop something like php artisan listen2posrgres with the logic from above and run it similar to php artisan queue:work throughout supervisor. Can this work?

正如一些评论所指出的,您可以为此编写自定义 Artisan 命令。 运行 命令开始“侦听”触发事件。

在 PostgreSQL 中,您可以在 Laravel 中创建一个类似于订阅者的触发器 -- 观察对 table 的更改,它收集 table 名称和操作等数据如果 Laravel 定义了与该数据库的连接,则通过 pg_notify 将其传递给 Laravel。您可以使用 JSON 处理对要传递给 Laravel 的数据进行编码,然后将其解析为 JSON 编码为字符串。

这是一个简单的示例,用于监视订单或用户 table 的更新、插入或删除。

触发器

create function notify_event() returns trigger
    language plpgsql
as
$$
DECLARE
        notification json;
    BEGIN

        -- PostgreSQL auto-defined variables:
        -- TG_OP    ~   action such as INSERT, DELETE, UPDATE
        -- TG_TABLE_NAME

        -- Contruct the notification as a JSON string.
        notification = json_build_object(
                          'table',TG_TABLE_NAME,
                          'action', TG_OP);


        -- Execute pg_notify(channel, notification)
        PERFORM pg_notify('events', notification::text);

        -- Result is ignored since this is an AFTER trigger
        RETURN NULL;
    END;

$$;

alter function notify_event() owner to YOUR_DATABASE_NAME_HERE;

Artisan 命令

<?php

namespace App\Console\Commands;

use App\Events\OrderCreated;
use App\Events\OrderDeleted;
use App\Events\OrderUpdated;
use App\Events\UserCreated;
use App\Events\UserDeleted;
use App\Events\UserUpdated;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

/**
 * Class SubscribeToTriggers
 *
 * @package App\Console\Commands
 */
class SubscribeToTriggers extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'psql:subscribe-to-triggers {--t|table=* : Tables to synchronize.}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Listen for changes on database and update the platform accordingly';

    /**
     *  Tables to synchronize.
     *
     * @var array
     */
    protected $tables;

    /**
     * @var
     */
    private $subscribers;

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();

        $this->tables = [];
        $this->subscribe();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $timeout = (int) $this->option('timeout');

        if ($table = $this->option('table')) {
            if (is_array($table)) {
                $this->tables = $table;
            } else {
                $this->tables[] = $table;
            }
        }

        try {
            $dbo = DB::connection('DATABASE_NAME_HERE')->getPdo();
            $dbo->exec('LISTEN "events"');
            while (true) {
                $event = $dbo->pgsqlGetNotify(\PDO::FETCH_ASSOC, $timeout * 1000);

                if ($this->output->isDebug()) {
                    $this->getOutput()->write($event);
                    $this->getOutput()->write(PHP_EOL);
                }

                $payload = json_decode($event['payload']);
                $table = $payload->table;
                $action = $payload->action;
                $original = $payload->original;
                $data = $payload->data;

                $observer = null;
                $subject = implode('@', [$table, strtolower($action)]);
                if (array_key_exists($subject, $this->subscribers)) {
                    $observer = $this->subscribers[$subject];
                } else if (array_key_exists($table, $this->subscribers)) {
                    $observer = $this->subscribers[$table];
                }
                if (isset($observer) && method_exists($this, $observer->handler)) {
                    $handler = $observer->handler;
                    $this->$handler($data, $action, $original);
                }
     
            }
        } catch (Exception $e) {
            logger($e->getMessage());
        }
    }

    /**
     * Set up observers to handle events on a table.
     *
     * @param $entity
     * @param $handler
     */
    private function listen($entity, $handler)
    {
        if (!isset($this->subscribers)) {
            $this->subscribers = [];
        }

        $info = explode('@', $entity);
        $table = $info[0];
        $action = count($info) > 1 ? $info[1] : null;

        $observer = new \stdClass();
        $observer->table = $table;
        $observer->action = $action;
        $observer->handler = $handler;
        $subject = !empty($action) ? implode('@', [$table, strtolower($action)]) : $table;
        $this->subscribers[$subject] = $observer;
    }

    /**
     * Subscribe to modification events on these tables.
     */
    private function subscribe()
    {
        $this->listen('orders_table', 'onOrder');
        $this->listen('users_table', 'onUser');
    }

    /**
     * @param $order
     * @param null $action
     * @param null $original
     */
    protected function onOrder($order, $action = null, $original = null)
    {
        $event = null;

        if ($action == 'INSERT') {
            $event = new OrderCreated();
        } else if ($action === 'UPDATE') {
            $event = new OrderUpdated();
        } else if ($action == 'DELETE') {
            $event = new OrderDeleted();
        }

        if (!is_null($event)) {
            event($event);
        }
    }

    /**
     * @param $user
     * @param null $action
     * @param null $original
     */
    protected function onUser($user, $action = null, $original = null)
    {
        $event = null;

        if ($action == 'INSERT') {
            $event = new UserCreated();
        } else if ($action === 'UPDATE') {
            $event = new UserUpdated();
        } else if ($action == 'DELETE') {
            $event = new UserDeleted();
        }

        if (!is_null($event)) {
            event($event);
        }
    }

}

然后您将编写一个 Laravel 订阅者来为命令中定义的每个事件定义 EventListeners:

  • 订单创建/订单更新/订单删除
  • 用户创建/用户更新/用户删除

并确保在 EventServiceProvider 的 $subscribers 块中注册此订阅者。

参考文献

PostgreSQL Triggers PostgreSQL JSON and Functions Laravel Events: Subscribers