SQLSQL DDL 中的网站触发器和日期时间默认值在 Python 中使用 Peewee
SQLite triggers & datetime defaults in SQL DDL using Peewee in Python
我有一个 SQLite table 定义如下:
create table if not exists KeyValuePair (
key CHAR(255) primary key not null,
val text not null,
fup timestamp default current_timestamp not null, -- time of first upload
lup timestamp default current_timestamp not null -- time of last upload
);
create trigger if not exists entry_first_insert after insert
on KeyValuePair
begin
update KeyValuePair set lup = current_timestamp where key = new.key;
end;
create trigger if not exists entry_last_updated after update of value
on KeyValuePair
begin
update KeyValuePair set lup = current_timestamp where key = old.key;
end;
我正在尝试为Python中的这个table写一个peewee.Model
。这是我目前所拥有的:
import peewee as pw
db = pw.SqliteDatabase('dhm.db')
class BaseModel(pw.Model):
class Meta:
database = db
class KeyValuePair(BaseModel):
key = pw.FixedCharField(primary_key=True, max_length=255)
val = pw.TextField(null=False)
fup = pw.DateTimeField(
verbose_name='first_updated', null=False, default=datetime.datetime.now)
lup = pw.DateTimeField(
verbose_name='last_updated', null=False, default=datetime.datetime.now)
db.connect()
db.create_tables([KeyValuePair])
当我检查最后一行生成的 SQL 时,我得到:
CREATE TABLE "keyvaluepair" (
"key" CHAR(255) NOT NULL PRIMARY KEY,
"val" TEXT NOT NULL,
"fup" DATETIME NOT NULL,
"lup" DATETIME NOT NULL
);
所以我现在有两个问题:
- 我一直无法找到一种方法来实现
entry_first_insert
和 entry_last_updated
触发器的行为。 peewee
支持触发器吗?如果没有,有没有办法从 .sql 文件而不是 Model
class 定义创建 table?
- 有没有办法使
fup
和 lup
的默认值传播到 SQL 定义?
您可以覆盖插入时间戳的模型的保存功能。有关示例,请参阅 TimeStampModel。
我已经找到了两个问题的正确答案。该解决方案实际上在 SQL DDL 中强制执行所需的触发器和默认时间戳。
首先我们定义一个方便的 class 来包装触发器的 SQL。对于 peewee.Node
对象,有一种更合适的方法来执行此操作,但我没有时间为这个项目深入研究所有这些。此 Trigger
class 只是提供字符串格式以输出正确的 sql 以创建触发器。
class Trigger(object):
"""Trigger template wrapper for use with peewee ORM."""
_template = """
{create} {name} {when} {trigger_op}
on {tablename}
begin
{op} {tablename} {sql} where {pk} = {old_new}.{pk};
end;
"""
def __init__(self, table, name, when, trigger_op, op, sql, safe=True):
self.create = 'create trigger' + (' if not exists' if safe else '')
self.tablename = table._meta.name
self.pk = table._meta.primary_key.name
self.name = name
self.when = when
self.trigger_op = trigger_op
self.op = op
self.sql = sql
self.old_new = 'new' if trigger_op.lower() == 'insert' else 'old'
def __str__(self):
return self._template.format(**self.__dict__)
接下来我们定义一个继承自BaseModel
的classTriggerTable
。此 class 覆盖默认 create_table
以跟随 table 创建触发器创建。如果任何触发器创建失败,则整个创建回滚。
class TriggerTable(BaseModel):
"""Table with triggers."""
@classmethod
def triggers(cls):
"""Return an iterable of `Trigger` objects to create upon table creation."""
return tuple()
@classmethod
def new_trigger(cls, name, when, trigger_op, op, sql):
"""Create a new trigger for this class's table."""
return Trigger(cls, name, when, trigger_op, op, sql)
@classmethod
def create_table(cls, fail_silently=False):
"""Create this table in the underlying database."""
super(TriggerTable, cls).create_table(fail_silently)
for trigger in cls.triggers():
try:
cls._meta.database.execute_sql(str(trigger))
except:
cls._meta.database.drop_table(cls, fail_silently)
raise
下一步是创建 class BetterDateTimeField
。如果 default
实例变量设置为 datetime.datetime.now
函数,此 Field
对象将覆盖默认 __ddl__
以附加 "DEFAULT current_timestamp" 字符串。当然有更好的方法来做到这一点,但这个方法抓住了基本用例。
class BetterDateTimeField(pw.DateTimeField):
"""Propogate defaults to database layer."""
def __ddl__(self, column_type):
"""Return a list of Node instances that defines the column."""
ddl = super(BetterDateTimeField, self).__ddl__(column_type)
if self.default == datetime.datetime.now:
ddl.append(pw.SQL('DEFAULT current_timestamp'))
return ddl
最后,我们定义了新的和改进的 KeyValuePair
模型,结合了我们的触发器和日期时间字段改进。我们通过创建 table.
来结束 Python 代码
class KeyValuePair(TriggerTable):
"""DurableHashMap entries are key-value pairs."""
key = pw.FixedCharField(primary_key=True, max_length=255)
val = pw.TextField(null=False)
fup = BetterDateTimeField(
verbose_name='first_updated', null=False, default=datetime.datetime.now)
lup = BetterDateTimeField(
verbose_name='last_updated', null=False, default=datetime.datetime.now)
@classmethod
def triggers(cls):
return (
cls.new_trigger(
'kvp_first_insert', 'after', 'insert', 'update',
'set lup = current_timestamp'),
cls.new_trigger(
'kvp_last_udpated', 'after', 'update', 'update',
'set lup = current_timestamp')
)
KeyValuePair.create_table()
现在架构已正确创建:
sqlite> .schema keyvaluepair
CREATE TABLE "keyvaluepair" ("key" CHAR(255) NOT NULL PRIMARY KEY, "val" TEXT NOT NULL, "fup" DATETIME NOT NULL DEFAULT current_timestamp, "lup" DATETIME NOT NULL DEFAULT current_timestamp);
CREATE TRIGGER kvp_first_insert after insert
on keyvaluepair
begin
update keyvaluepair set lup = current_timestamp where key = new.key;
end;
CREATE TRIGGER kvp_last_udpated after update
on keyvaluepair
begin
update keyvaluepair set lup = current_timestamp where key = old.key;
end;
sqlite> insert into keyvaluepair (key, val) values ('test', 'test-value');
sqlite> select * from keyvaluepair;
test|test-value|2015-12-07 21:58:05|2015-12-07 21:58:05
sqlite> update keyvaluepair set val = 'test-value-two' where key = 'test';
sqlite> select * from keyvaluepair;
test|test-value-two|2015-12-07 21:58:05|2015-12-07 21:58:22
我前一段时间偶然发现了这个问题,并花了一些时间想出一个优化设计来支持 PeeWee 中的触发器(受上述答案的启发)。我对我们最终如何实施它感到非常满意,并想分享这一点。在某些时候,我会为此对 Peewee 进行 PR。
在 PeeWee 中创建触发器和触发器监听器
Objective
本文档分两部分介绍如何执行此操作:
- 如何将
Trigger
添加到数据库中的模型。
- 如何创建一个
ListenThread
将有一个回调函数,每次 table 更新时都会收到通知。
实施方法
这种设计的美妙之处在于您只需要一项:TriggerModelMixin 模型。然后很容易为 subscribe/have 回调方法创建侦听器。
TriggerModelMixin 可以复制粘贴为:
class TriggerModelMixin(Model):
""" PeeWee Model with support for triggers.
This will create a trigger that on all table updates will send
a NOTIFY to {tablename}_updates.
Note that it will also take care of updating the triggers as
appropriate/necesary.
"""
_template = """
CREATE OR REPLACE FUNCTION {function_name}()
RETURNS trigger AS
$BODY$
BEGIN
PERFORM pg_notify(
CAST('{notify_channel_name}' AS text),
row_to_json(NEW)::text);
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
ALTER FUNCTION {function_name}() OWNER TO postgres;
DROP TRIGGER IF EXISTS {trigger_name} ON "{tablename}";
CREATE TRIGGER {trigger_name}
AFTER INSERT OR UPDATE OR DELETE
ON "{tablename}"
{frequency}
EXECUTE PROCEDURE {function_name}();
"""
function_name_template = "{table_name}updatesfunction"
trigger_name_template = "{table_name}updatestrigger"
notify_channel_name_template = "{table_name}updates"
frequency = "FOR EACH ROW"
@classmethod
def get_notify_channel(cls):
table_name = cls._meta.table_name
return cls.notify_channel_name_template.format(**{"table_name": table_name})
@classmethod
def create_table(cls, fail_silently=False):
""" Create table and triggers """
super(TriggerModelMixin, cls).create_table()
table_name = cls._meta.table_name
notify_channel = cls.get_notify_channel()
function_name = cls.function_name_template.format(**{"table_name": table_name})
trigger_name = cls.trigger_name_template.format(**{"table_name": table_name})
trigger = cls._template.format(**{
"function_name": function_name,
"trigger_name": trigger_name,
"notify_channel_name": notify_channel,
"tablename": table_name,
"frequency": cls.frequency
}
)
logger.info(f"Creating Triggers for {cls}")
cls._meta.database.execute_sql(str(trigger))
@classmethod
def create_db_listener(cls):
''' Returns an object that will listen to the database notify channel
and call a specified callback function if triggered.
'''
class Trigger_Listener:
def __init__(self, db_model):
self.db_model = db_model
self.running = True
self.test_mode = False
self.channel_name = ""
def stop(self):
self.running = False
def listen_and_call(self, f, *args, timeout: int = 5, sync=False):
''' Start listening and call the callback method `f` if a
trigger notify is received.
This has two styles: sync (blocking) and async (non-blocking)
Note that `f` must have `record` as a keyword parameter - this
will be the record that sent the notification.
'''
if sync:
return self.listen_and_call_sync(f, *args, timeout=timeout)
else:
t = threading.Thread(
target=self.listen_and_call_sync,
args=(f, *args),
kwargs={'timeout': timeout}
)
t.start()
def listen_and_call_sync(self, f, *args, timeout: int = 5):
''' Call callback function `f` when the channel is notified. '''
self.channel_name = self.db_model.get_notify_channel()
db = self.db_model._meta.database
db.execute_sql(f"LISTEN {self.channel_name};")
conn = db.connection()
while self.running:
# The if see's if the response is non-null
if not select.select([conn], [], [], timeout) == ([], [], []):
# Wait for the bytes to become fully available in the buffer
conn.poll()
while conn.notifies:
record = conn.notifies.pop(0)
logger.info(f"Trigger recieved with record {record}")
f(*args, record=record)
if self.test_mode:
break
return Trigger_Listener(cls)
示例实现:
db_listener = FPGExchangeOrder.create_db_listener()
def callback_method(record=None):
# CallBack Method to handle the record.
logger.info(f"DB update on record: f{record}")
# Handle the update here
db_listener.listen_and_call(callback_method)
如何使用这个
1。向数据库中的模型添加触发器
这很容易。只需将 mixin TriggerModelMixin 添加到您想要添加支持的模型中。此 Mixin 将处理触发器的创建,以及在调用触发器时通知的监听方法。
2。创建一个 ListenThread 以进行回调
我们有两种侦听器模式:async
(非阻塞)和sync
(阻塞)。默认情况下,它将是 non-blocking
,如果您希望它被阻塞,您可以使用 sync=True
更改它。
要使用它(在任何一种情况下),请创建一个回调方法。请注意,此回调方法将在收到更新时阻塞(记录是串行处理的),因此请不要在该方法中负载过重或 I/O。此方法的唯一要求是 record
的键控参数 - 这将是数据库中的记录作为字典返回的地方。
由此,只需创建侦听器,然后调用 listen_and_call
。
我有一个 SQLite table 定义如下:
create table if not exists KeyValuePair (
key CHAR(255) primary key not null,
val text not null,
fup timestamp default current_timestamp not null, -- time of first upload
lup timestamp default current_timestamp not null -- time of last upload
);
create trigger if not exists entry_first_insert after insert
on KeyValuePair
begin
update KeyValuePair set lup = current_timestamp where key = new.key;
end;
create trigger if not exists entry_last_updated after update of value
on KeyValuePair
begin
update KeyValuePair set lup = current_timestamp where key = old.key;
end;
我正在尝试为Python中的这个table写一个peewee.Model
。这是我目前所拥有的:
import peewee as pw
db = pw.SqliteDatabase('dhm.db')
class BaseModel(pw.Model):
class Meta:
database = db
class KeyValuePair(BaseModel):
key = pw.FixedCharField(primary_key=True, max_length=255)
val = pw.TextField(null=False)
fup = pw.DateTimeField(
verbose_name='first_updated', null=False, default=datetime.datetime.now)
lup = pw.DateTimeField(
verbose_name='last_updated', null=False, default=datetime.datetime.now)
db.connect()
db.create_tables([KeyValuePair])
当我检查最后一行生成的 SQL 时,我得到:
CREATE TABLE "keyvaluepair" (
"key" CHAR(255) NOT NULL PRIMARY KEY,
"val" TEXT NOT NULL,
"fup" DATETIME NOT NULL,
"lup" DATETIME NOT NULL
);
所以我现在有两个问题:
- 我一直无法找到一种方法来实现
entry_first_insert
和entry_last_updated
触发器的行为。peewee
支持触发器吗?如果没有,有没有办法从 .sql 文件而不是Model
class 定义创建 table? - 有没有办法使
fup
和lup
的默认值传播到 SQL 定义?
您可以覆盖插入时间戳的模型的保存功能。有关示例,请参阅 TimeStampModel。
我已经找到了两个问题的正确答案。该解决方案实际上在 SQL DDL 中强制执行所需的触发器和默认时间戳。
首先我们定义一个方便的 class 来包装触发器的 SQL。对于 peewee.Node
对象,有一种更合适的方法来执行此操作,但我没有时间为这个项目深入研究所有这些。此 Trigger
class 只是提供字符串格式以输出正确的 sql 以创建触发器。
class Trigger(object):
"""Trigger template wrapper for use with peewee ORM."""
_template = """
{create} {name} {when} {trigger_op}
on {tablename}
begin
{op} {tablename} {sql} where {pk} = {old_new}.{pk};
end;
"""
def __init__(self, table, name, when, trigger_op, op, sql, safe=True):
self.create = 'create trigger' + (' if not exists' if safe else '')
self.tablename = table._meta.name
self.pk = table._meta.primary_key.name
self.name = name
self.when = when
self.trigger_op = trigger_op
self.op = op
self.sql = sql
self.old_new = 'new' if trigger_op.lower() == 'insert' else 'old'
def __str__(self):
return self._template.format(**self.__dict__)
接下来我们定义一个继承自BaseModel
的classTriggerTable
。此 class 覆盖默认 create_table
以跟随 table 创建触发器创建。如果任何触发器创建失败,则整个创建回滚。
class TriggerTable(BaseModel):
"""Table with triggers."""
@classmethod
def triggers(cls):
"""Return an iterable of `Trigger` objects to create upon table creation."""
return tuple()
@classmethod
def new_trigger(cls, name, when, trigger_op, op, sql):
"""Create a new trigger for this class's table."""
return Trigger(cls, name, when, trigger_op, op, sql)
@classmethod
def create_table(cls, fail_silently=False):
"""Create this table in the underlying database."""
super(TriggerTable, cls).create_table(fail_silently)
for trigger in cls.triggers():
try:
cls._meta.database.execute_sql(str(trigger))
except:
cls._meta.database.drop_table(cls, fail_silently)
raise
下一步是创建 class BetterDateTimeField
。如果 default
实例变量设置为 datetime.datetime.now
函数,此 Field
对象将覆盖默认 __ddl__
以附加 "DEFAULT current_timestamp" 字符串。当然有更好的方法来做到这一点,但这个方法抓住了基本用例。
class BetterDateTimeField(pw.DateTimeField):
"""Propogate defaults to database layer."""
def __ddl__(self, column_type):
"""Return a list of Node instances that defines the column."""
ddl = super(BetterDateTimeField, self).__ddl__(column_type)
if self.default == datetime.datetime.now:
ddl.append(pw.SQL('DEFAULT current_timestamp'))
return ddl
最后,我们定义了新的和改进的 KeyValuePair
模型,结合了我们的触发器和日期时间字段改进。我们通过创建 table.
class KeyValuePair(TriggerTable):
"""DurableHashMap entries are key-value pairs."""
key = pw.FixedCharField(primary_key=True, max_length=255)
val = pw.TextField(null=False)
fup = BetterDateTimeField(
verbose_name='first_updated', null=False, default=datetime.datetime.now)
lup = BetterDateTimeField(
verbose_name='last_updated', null=False, default=datetime.datetime.now)
@classmethod
def triggers(cls):
return (
cls.new_trigger(
'kvp_first_insert', 'after', 'insert', 'update',
'set lup = current_timestamp'),
cls.new_trigger(
'kvp_last_udpated', 'after', 'update', 'update',
'set lup = current_timestamp')
)
KeyValuePair.create_table()
现在架构已正确创建:
sqlite> .schema keyvaluepair
CREATE TABLE "keyvaluepair" ("key" CHAR(255) NOT NULL PRIMARY KEY, "val" TEXT NOT NULL, "fup" DATETIME NOT NULL DEFAULT current_timestamp, "lup" DATETIME NOT NULL DEFAULT current_timestamp);
CREATE TRIGGER kvp_first_insert after insert
on keyvaluepair
begin
update keyvaluepair set lup = current_timestamp where key = new.key;
end;
CREATE TRIGGER kvp_last_udpated after update
on keyvaluepair
begin
update keyvaluepair set lup = current_timestamp where key = old.key;
end;
sqlite> insert into keyvaluepair (key, val) values ('test', 'test-value');
sqlite> select * from keyvaluepair;
test|test-value|2015-12-07 21:58:05|2015-12-07 21:58:05
sqlite> update keyvaluepair set val = 'test-value-two' where key = 'test';
sqlite> select * from keyvaluepair;
test|test-value-two|2015-12-07 21:58:05|2015-12-07 21:58:22
我前一段时间偶然发现了这个问题,并花了一些时间想出一个优化设计来支持 PeeWee 中的触发器(受上述答案的启发)。我对我们最终如何实施它感到非常满意,并想分享这一点。在某些时候,我会为此对 Peewee 进行 PR。
在 PeeWee 中创建触发器和触发器监听器
Objective
本文档分两部分介绍如何执行此操作:
- 如何将
Trigger
添加到数据库中的模型。 - 如何创建一个
ListenThread
将有一个回调函数,每次 table 更新时都会收到通知。
实施方法
这种设计的美妙之处在于您只需要一项:TriggerModelMixin 模型。然后很容易为 subscribe/have 回调方法创建侦听器。
TriggerModelMixin 可以复制粘贴为:
class TriggerModelMixin(Model):
""" PeeWee Model with support for triggers.
This will create a trigger that on all table updates will send
a NOTIFY to {tablename}_updates.
Note that it will also take care of updating the triggers as
appropriate/necesary.
"""
_template = """
CREATE OR REPLACE FUNCTION {function_name}()
RETURNS trigger AS
$BODY$
BEGIN
PERFORM pg_notify(
CAST('{notify_channel_name}' AS text),
row_to_json(NEW)::text);
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
ALTER FUNCTION {function_name}() OWNER TO postgres;
DROP TRIGGER IF EXISTS {trigger_name} ON "{tablename}";
CREATE TRIGGER {trigger_name}
AFTER INSERT OR UPDATE OR DELETE
ON "{tablename}"
{frequency}
EXECUTE PROCEDURE {function_name}();
"""
function_name_template = "{table_name}updatesfunction"
trigger_name_template = "{table_name}updatestrigger"
notify_channel_name_template = "{table_name}updates"
frequency = "FOR EACH ROW"
@classmethod
def get_notify_channel(cls):
table_name = cls._meta.table_name
return cls.notify_channel_name_template.format(**{"table_name": table_name})
@classmethod
def create_table(cls, fail_silently=False):
""" Create table and triggers """
super(TriggerModelMixin, cls).create_table()
table_name = cls._meta.table_name
notify_channel = cls.get_notify_channel()
function_name = cls.function_name_template.format(**{"table_name": table_name})
trigger_name = cls.trigger_name_template.format(**{"table_name": table_name})
trigger = cls._template.format(**{
"function_name": function_name,
"trigger_name": trigger_name,
"notify_channel_name": notify_channel,
"tablename": table_name,
"frequency": cls.frequency
}
)
logger.info(f"Creating Triggers for {cls}")
cls._meta.database.execute_sql(str(trigger))
@classmethod
def create_db_listener(cls):
''' Returns an object that will listen to the database notify channel
and call a specified callback function if triggered.
'''
class Trigger_Listener:
def __init__(self, db_model):
self.db_model = db_model
self.running = True
self.test_mode = False
self.channel_name = ""
def stop(self):
self.running = False
def listen_and_call(self, f, *args, timeout: int = 5, sync=False):
''' Start listening and call the callback method `f` if a
trigger notify is received.
This has two styles: sync (blocking) and async (non-blocking)
Note that `f` must have `record` as a keyword parameter - this
will be the record that sent the notification.
'''
if sync:
return self.listen_and_call_sync(f, *args, timeout=timeout)
else:
t = threading.Thread(
target=self.listen_and_call_sync,
args=(f, *args),
kwargs={'timeout': timeout}
)
t.start()
def listen_and_call_sync(self, f, *args, timeout: int = 5):
''' Call callback function `f` when the channel is notified. '''
self.channel_name = self.db_model.get_notify_channel()
db = self.db_model._meta.database
db.execute_sql(f"LISTEN {self.channel_name};")
conn = db.connection()
while self.running:
# The if see's if the response is non-null
if not select.select([conn], [], [], timeout) == ([], [], []):
# Wait for the bytes to become fully available in the buffer
conn.poll()
while conn.notifies:
record = conn.notifies.pop(0)
logger.info(f"Trigger recieved with record {record}")
f(*args, record=record)
if self.test_mode:
break
return Trigger_Listener(cls)
示例实现:
db_listener = FPGExchangeOrder.create_db_listener()
def callback_method(record=None):
# CallBack Method to handle the record.
logger.info(f"DB update on record: f{record}")
# Handle the update here
db_listener.listen_and_call(callback_method)
如何使用这个
1。向数据库中的模型添加触发器
这很容易。只需将 mixin TriggerModelMixin 添加到您想要添加支持的模型中。此 Mixin 将处理触发器的创建,以及在调用触发器时通知的监听方法。
2。创建一个 ListenThread 以进行回调
我们有两种侦听器模式:async
(非阻塞)和sync
(阻塞)。默认情况下,它将是 non-blocking
,如果您希望它被阻塞,您可以使用 sync=True
更改它。
要使用它(在任何一种情况下),请创建一个回调方法。请注意,此回调方法将在收到更新时阻塞(记录是串行处理的),因此请不要在该方法中负载过重或 I/O。此方法的唯一要求是 record
的键控参数 - 这将是数据库中的记录作为字典返回的地方。
由此,只需创建侦听器,然后调用 listen_and_call
。