SQLSQL DDL 中的网站触发器和日期时间默认值在 Python 中使用 Peewee

SQLite triggers & datetime defaults in SQL DDL using Peewee in Python

我有一个 SQLite table 定义如下:

create table if not exists KeyValuePair (
    key         CHAR(255) primary key not null,
    val         text not null,
    fup         timestamp default current_timestamp not null,  -- time of first upload
    lup         timestamp default current_timestamp not null  -- time of last upload
);

create trigger if not exists entry_first_insert after insert
on KeyValuePair
begin
    update KeyValuePair set lup = current_timestamp where key = new.key;
end;

create trigger if not exists entry_last_updated after update of value
on KeyValuePair
begin
    update KeyValuePair set lup = current_timestamp where key = old.key;
end;

我正在尝试为Python中的这个table写一个peewee.Model。这是我目前所拥有的:

import peewee as pw


db = pw.SqliteDatabase('dhm.db')
class BaseModel(pw.Model):
    class Meta:
        database = db

class KeyValuePair(BaseModel):
    key = pw.FixedCharField(primary_key=True, max_length=255)
    val = pw.TextField(null=False)
    fup = pw.DateTimeField(
        verbose_name='first_updated', null=False, default=datetime.datetime.now)
    lup = pw.DateTimeField(
        verbose_name='last_updated', null=False, default=datetime.datetime.now)

db.connect()
db.create_tables([KeyValuePair])

当我检查最后一行生成的 SQL 时,我得到:

CREATE TABLE "keyvaluepair" (
    "key" CHAR(255) NOT NULL PRIMARY KEY,
    "val" TEXT NOT NULL,
    "fup" DATETIME NOT NULL,
    "lup" DATETIME NOT NULL
);

所以我现在有两个问题:

  1. 我一直无法找到一种方法来实现 entry_first_insertentry_last_updated 触发器的行为。 peewee 支持触发器吗?如果没有,有没有办法从 .sql 文件而不是 Model class 定义创建 table?
  2. 有没有办法使 fuplup 的默认值传播到 SQL 定义?

您可以覆盖插入时间戳的模型的保存功能。有关示例,请参阅 TimeStampModel

我已经找到了两个问题的正确答案。该解决方案实际上在 SQL DDL 中强制执行所需的触发器和默认时间戳。

首先我们定义一个方便的 class 来包装触发器的 SQL。对于 peewee.Node 对象,有一种更合适的方法来执行此操作,但我没有时间为这个项目深入研究所有这些。此 Trigger class 只是提供字符串格式以输出正确的 sql 以创建触发器。

class Trigger(object):
    """Trigger template wrapper for use with peewee ORM."""

    _template = """
    {create} {name} {when} {trigger_op}
    on {tablename}
    begin
        {op} {tablename} {sql} where {pk} = {old_new}.{pk};
    end;
    """

    def __init__(self, table, name, when, trigger_op, op, sql, safe=True):
        self.create = 'create trigger' + (' if not exists' if safe else '')
        self.tablename = table._meta.name
        self.pk = table._meta.primary_key.name
        self.name = name
        self.when = when
        self.trigger_op = trigger_op
        self.op = op
        self.sql = sql
        self.old_new = 'new' if trigger_op.lower() == 'insert' else 'old'

    def __str__(self):
        return self._template.format(**self.__dict__)

接下来我们定义一个继承自BaseModel的classTriggerTable。此 class 覆盖默认 create_table 以跟随 table 创建触发器创建。如果任何触发器创建失败,则整个创建回滚。

class TriggerTable(BaseModel):
    """Table with triggers."""

    @classmethod
    def triggers(cls):
        """Return an iterable of `Trigger` objects to create upon table creation."""
        return tuple()

    @classmethod
    def new_trigger(cls, name, when, trigger_op, op, sql):
        """Create a new trigger for this class's table."""
        return Trigger(cls, name, when, trigger_op, op, sql)

    @classmethod
    def create_table(cls, fail_silently=False):
        """Create this table in the underlying database."""
        super(TriggerTable, cls).create_table(fail_silently)
        for trigger in cls.triggers():
            try:
                cls._meta.database.execute_sql(str(trigger))
            except:
                cls._meta.database.drop_table(cls, fail_silently)
                raise

下一步是创建 class BetterDateTimeField。如果 default 实例变量设置为 datetime.datetime.now 函数,此 Field 对象将覆盖默认 __ddl__ 以附加 "DEFAULT current_timestamp" 字符串。当然有更好的方法来做到这一点,但这个方法抓住了基本用例。

class BetterDateTimeField(pw.DateTimeField):
    """Propogate defaults to database layer."""

    def __ddl__(self, column_type):
        """Return a list of Node instances that defines the column."""
        ddl = super(BetterDateTimeField, self).__ddl__(column_type)
        if self.default == datetime.datetime.now:
            ddl.append(pw.SQL('DEFAULT current_timestamp'))
        return ddl

最后,我们定义了新的和改进的 KeyValuePair 模型,结合了我们的触发器和日期时间字段改进。我们通过创建 table.

来结束 Python 代码
class KeyValuePair(TriggerTable):
    """DurableHashMap entries are key-value pairs."""

    key = pw.FixedCharField(primary_key=True, max_length=255)
    val = pw.TextField(null=False)
    fup = BetterDateTimeField(
        verbose_name='first_updated', null=False, default=datetime.datetime.now)
    lup = BetterDateTimeField(
        verbose_name='last_updated', null=False, default=datetime.datetime.now)

    @classmethod
    def triggers(cls):
        return (
            cls.new_trigger(
                'kvp_first_insert', 'after', 'insert', 'update',
                'set lup = current_timestamp'),
            cls.new_trigger(
                'kvp_last_udpated', 'after', 'update', 'update',
                'set lup = current_timestamp')
        )

KeyValuePair.create_table()

现在架构已正确创建:

sqlite> .schema keyvaluepair
CREATE TABLE "keyvaluepair" ("key" CHAR(255) NOT NULL PRIMARY KEY, "val" TEXT NOT NULL, "fup" DATETIME NOT NULL DEFAULT current_timestamp, "lup" DATETIME NOT NULL DEFAULT current_timestamp);
CREATE TRIGGER kvp_first_insert after insert
    on keyvaluepair
    begin
        update keyvaluepair set lup = current_timestamp where key = new.key;
    end;
CREATE TRIGGER kvp_last_udpated after update
    on keyvaluepair
    begin
        update keyvaluepair set lup = current_timestamp where key = old.key;
    end;
sqlite> insert into keyvaluepair (key, val) values ('test', 'test-value');
sqlite> select * from keyvaluepair;
test|test-value|2015-12-07 21:58:05|2015-12-07 21:58:05
sqlite> update keyvaluepair set val = 'test-value-two' where key = 'test';
sqlite> select * from keyvaluepair;
test|test-value-two|2015-12-07 21:58:05|2015-12-07 21:58:22

我前一段时间偶然发现了这个问题,并花了一些时间想出一个优化设计来支持 PeeWee 中的触发器(受上述答案的启发)。我对我们最终如何实施它感到非常满意,并想分享这一点。在某些时候,我会为此对 Peewee 进行 PR。

在 PeeWee 中创建触发器和触发器监听器

Objective

本文档分两部分介绍如何执行此操作:

  • 如何将 Trigger 添加到数据库中的模型。
  • 如何创建一个 ListenThread 将有一个回调函数,每次 table 更新时都会收到通知。

实施方法

这种设计的美妙之处在于您只需要一项:TriggerModelMixin 模型。然后很容易为 subscribe/have 回调方法创建侦听器。

TriggerModelMixin 可以复制粘贴为:

class TriggerModelMixin(Model):
    """ PeeWee Model with support for triggers.
    This will create a trigger that on all table updates will send
    a NOTIFY to {tablename}_updates.
    Note that it will also take care of updating the triggers as
    appropriate/necesary.
    """

    _template = """
    CREATE OR REPLACE FUNCTION {function_name}()
        RETURNS trigger AS
        $BODY$
        BEGIN
          PERFORM pg_notify(
            CAST('{notify_channel_name}' AS text),
            row_to_json(NEW)::text);
          RETURN NEW;
        END;
        $BODY$
        LANGUAGE plpgsql VOLATILE
        COST 100;
        ALTER FUNCTION {function_name}() OWNER TO postgres;

    DROP TRIGGER IF EXISTS {trigger_name} ON "{tablename}";

    CREATE TRIGGER {trigger_name}
        AFTER INSERT OR UPDATE OR DELETE
        ON "{tablename}"
        {frequency}
        EXECUTE PROCEDURE {function_name}();
    """

    function_name_template = "{table_name}updatesfunction"
    trigger_name_template = "{table_name}updatestrigger"
    notify_channel_name_template = "{table_name}updates"
    frequency = "FOR EACH ROW"

    @classmethod
    def get_notify_channel(cls):
        table_name = cls._meta.table_name
        return cls.notify_channel_name_template.format(**{"table_name": table_name})

    @classmethod
    def create_table(cls, fail_silently=False):
        """ Create table and triggers """
        super(TriggerModelMixin, cls).create_table()

        table_name = cls._meta.table_name
        notify_channel = cls.get_notify_channel()
        function_name = cls.function_name_template.format(**{"table_name": table_name})
        trigger_name = cls.trigger_name_template.format(**{"table_name": table_name})

        trigger = cls._template.format(**{
                                            "function_name": function_name,
                                            "trigger_name": trigger_name,
                                            "notify_channel_name": notify_channel,
                                            "tablename": table_name,
                                            "frequency": cls.frequency
                                         }
                                       )
        logger.info(f"Creating Triggers for {cls}")
        cls._meta.database.execute_sql(str(trigger))

    @classmethod
    def create_db_listener(cls):
        ''' Returns an object that will listen to the database notify channel
        and call a specified callback function if triggered.
        '''

        class Trigger_Listener:
            def __init__(self, db_model):
                self.db_model = db_model
                self.running = True
                self.test_mode = False
                self.channel_name = ""

            def stop(self):
                self.running = False

            def listen_and_call(self, f, *args, timeout: int = 5, sync=False):
                ''' Start listening and call the callback method `f` if a
                trigger notify is received.
                This has two styles: sync (blocking) and async (non-blocking)
                Note that `f` must have `record` as a keyword parameter - this
                will be the record that sent the notification.
                '''
                if sync:
                    return self.listen_and_call_sync(f, *args, timeout=timeout)
                else:
                    t = threading.Thread(
                        target=self.listen_and_call_sync,
                        args=(f, *args),
                        kwargs={'timeout': timeout}
                    )
                    t.start()

            def listen_and_call_sync(self, f, *args, timeout: int = 5):
                ''' Call callback function `f` when the channel is notified. '''
                self.channel_name = self.db_model.get_notify_channel()
                db = self.db_model._meta.database

                db.execute_sql(f"LISTEN {self.channel_name};")
                conn = db.connection()
                while self.running:
                    # The if see's if the response is non-null
                    if not select.select([conn], [], [], timeout) == ([], [], []):
                        # Wait for the bytes to become fully available in the buffer
                        conn.poll()
                        while conn.notifies:
                            record = conn.notifies.pop(0)
                            logger.info(f"Trigger recieved with record {record}")
                            f(*args, record=record)
                    if self.test_mode:
                        break

        return Trigger_Listener(cls)

示例实现:

db_listener = FPGExchangeOrder.create_db_listener()

def callback_method(record=None):
     # CallBack Method to handle the record.
     logger.info(f"DB update on record: f{record}")
     # Handle the update here

db_listener.listen_and_call(callback_method)

如何使用这个

1。向数据库中的模型添加触发器

这很容易。只需将 mixin TriggerModelMixin 添加到您想要添加支持的模型中。此 Mixin 将处理触发器的创建,以及在调用触发器时通知的监听方法。

2。创建一个 ListenThread 以进行回调

我们有两种侦听器模式:async(非阻塞)和sync(阻塞)。默认情况下,它将是 non-blocking,如果您希望它被阻塞,您可以使用 sync=True 更改它。

要使用它(在任何一种情况下),请创建一个回调方法。请注意,此回调方法将在收到更新时阻塞(记录是串行处理的),因此请不要在该方法中负载过重或 I/O。此方法的唯一要求是 record 的键控参数 - 这将是数据库中的记录作为字典返回的地方。

由此,只需创建侦听器,然后调用 listen_and_call