Postgresql分区和sqlalchemy
Postgresql partition and sqlalchemy
SQLAlchemy 文档 explain 如何创建分区 table。但它没有解释如何创建分区。
所以如果我有这个:
#Skipping create_engine and metadata
Base = declarative_base()
class Measure(Base):
__tablename__ = 'measures'
__table_args__ = {
postgresql_partition_by: 'RANGE (log_date)'
}
city_id = Column(Integer, not_null=True)
log_date = Columne(Date, not_null=True)
peaktemp = Column(Integer)
unitsales = Column(Integer)
class Measure2020(Base):
"""How am I suppposed to declare this ? """
我知道我将要做的大部分事情 SELECT * FROM measures WHERE logdate between XX and YY
。但这似乎很有趣。
您可以使用 MeasureMixin
两者都可以继承的 类。
然后使用 event
附加 table 分区。
from sqlalchemy import event
class MeasureMixin:
city_id = Column(Integer, not_null=True)
log_date = Column(Date, not_null=True)
peaktemp = Column(Integer)
unitsales = Column(Integer)
class Measure(MeasureMixin, Base):
__tablename__ = 'measures'
__table_args__ = {
postgresql_partition_by: 'RANGE (log_date)'
}
class Measure2020(MeasureMixin, Base):
__tablename__ = 'measures2020'
Measure2020.__table__.add_is_dependent_on(Measure.__table__)
event.listen(
Measure2020.__table__,
"after_create",
DDL("""ALTER TABLE measures ATTACH PARTITION measures2020
VALUES FROM ('2020-01-01') TO ('2021-01-01');""")
)
我遇到了类似的问题。我发现@moshevi 的回答非常有用,并最终对其进行了一些概括(因为我有很多 table 要分区)。
首先,创建一个这样的元类:
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.ddl import DDL
from sqlalchemy import event
class PartitionByYearMeta(DeclarativeMeta):
def __new__(cls, clsname, bases, attrs, *, partition_by):
@classmethod
def get_partition_name(cls_, key):
# 'measures' -> 'measures_2020' (customise as needed)
return f'{cls_.__tablename__}_{key}'
@classmethod
def create_partition(cls_, key):
if key not in cls_.partitions:
Partition = type(
f'{clsname}{key}', # Class name, only used internally
bases,
{'__tablename__': cls_.get_partition_name(key)}
)
Partition.__table__.add_is_dependent_on(cls_.__table__)
event.listen(
Partition.__table__,
'after_create',
DDL(
# For non-year ranges, modify the FROM and TO below
f"""
ALTER TABLE {cls_.__tablename__}
ATTACH PARTITION {Partition.__tablename__}
FOR VALUES FROM ('{key}-01-01') TO ('{key+1}-01-01');
"""
)
)
cls_.partitions[key] = Partition
return cls_.partitions[key]
attrs.update(
{
# For non-RANGE partitions, modify the `postgresql_partition_by` key below
'__table_args__': attrs.get('__table_args__', ())
+ (dict(postgresql_partition_by=f'RANGE({partition_by})'),),
'partitions': {},
'partitioned_by': partition_by,
'get_partition_name': get_partition_name,
'create_partition': create_partition
}
)
return super().__new__(cls, clsname, bases, attrs)
接下来,对于模型中要分割的任何 table:
class MeasureMixin:
# The columns need to be pulled out into this mixin
# Note: any foreign key columns will need to be wrapped like this:
@declared_attr
def city_id(self):
return Column(ForeignKey('cities.id'), not_null=True)
log_date = Column(Date, not_null=True)
peaktemp = Column(Integer)
unitsales = Column(Integer)
class Measure(MeasureMixin, Base, metaclass=PartitionByYearMeta, partition_by='logdate'):
__tablename__ = 'measures'
这样可以轻松添加更多 table 并按任意数量的值进行分区。
动态创建新分区的方式如下:
# Make sure you commit any session that is currently open, even for select queries:
session.commit()
Partition = Measure.create_partition(2020)
if not engine.dialect.has_table(Partition.__table__.name):
Partition.__table__.create(bind=engine)
现在键 2020
的分区已创建,可以插入该年份的值。
对于数据库分区(PostgreSQL 或 MySQL),您可以尝试使用 architect 包。它适用于一系列 ORM 库,包括 SQLAlchemy。这是 PostgreSQL 的示例 - https://architect.readthedocs.io/features/partition/postgresql.html。支持多种分区类型,希望能满足您的需求。
可能有点晚了,但我想分享我在@moshevi 和@Seb 的回答基础上所做的工作:
在我的物联网用例中,我需要实际的 sub 分区(第一级 year
,第二级 nodeid
)。我也想稍微概括一下。
这是我想出的:
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.ddl import DDL
from sqlalchemy import event
class PartitionByMeta(DeclarativeMeta):
def __new__(cls, clsname, bases, attrs, *, partition_by, partition_type):
@classmethod
def get_partition_name(cls_, suffix):
return f'{cls_.__tablename__}_{suffix}'
@classmethod
def create_partition(cls_, suffix, partition_stmt, subpartition_by=None, subpartition_type=None):
if suffix not in cls_.partitions:
partition = PartitionByMeta(
f'{clsname}{suffix}',
bases,
{'__tablename__': cls_.get_partition_name(suffix)},
partition_type = subpartition_type,
partition_by=subpartition_by,
)
partition.__table__.add_is_dependent_on(cls_.__table__)
event.listen(
partition.__table__,
'after_create',
DDL(
# For non-year ranges, modify the FROM and TO below
# LIST: IN ('first', 'second');
# RANGE: FROM ('{key}-01-01') TO ('{key+1}-01-01')
f"""
ALTER TABLE {cls_.__tablename__}
ATTACH PARTITION {partition.__tablename__}
{partition_stmt};
"""
)
)
cls_.partitions[suffix] = partition
return cls_.partitions[suffix]
if partition_by is not None:
attrs.update(
{
'__table_args__': attrs.get('__table_args__', ())
+ (dict(postgresql_partition_by=f'{partition_type.upper()}({partition_by})'),),
'partitions': {},
'partitioned_by': partition_by,
'get_partition_name': get_partition_name,
'create_partition': create_partition
}
)
return super().__new__(cls, clsname, bases, attrs)
将按如下方式使用,假设按照@moshevi
介绍的方式创建相应的VehicleDataMixin
class
class VehicleData(VehicleDataMixin, Project, metaclass=PartitionByMeta, partition_by='timestamp',partition_type='RANGE'):
__tablename__ = 'vehicle_data'
__table_args__ = (
Index('ts_ch_nod_idx', "timestamp", "nodeid", "channelid", postgresql_using='brin'),
UniqueConstraint('timestamp','nodeid','channelid', name='ts_ch_nod_constr')
)
然后可以像这样迭代地对其进行子分区(待改编)
for y in range(2017, 2021):
# Creating tables for all known nodeids
tbl_vehid_y = VehicleData.create_partition(
f"{y}", partition_stmt=f"""FOR VALUES FROM ('{y}-01-01') TO ('{y+1}-01-01')""",
subpartition_by='nodeid', subpartition_type='LIST'
)
for i in {3, 4, 7, 9}:
# Creating all the years below these nodeids including a default partition
tbl_vehid_y.create_partition(
f"nid{i}", partition_stmt=f"""FOR VALUES IN ('{i}')"""
)
# Defaults (nodeid) per year partition
tbl_vehid_y.create_partition("def", partition_stmt="DEFAULT")
# Default to any other year than anticipated
VehicleData.create_partition("def", partition_stmt="DEFAULT")
partition_by='timestamp'
<= 这是要分区的列
partition_type='RANGE'
<= 这是(PSQL 特定的)分区类型
partition_stmt=f"""FOR VALUES IN ('{i}')"""
<= 这是(特定于 PSQL 的)分区语句。
SQLAlchemy 文档 explain 如何创建分区 table。但它没有解释如何创建分区。
所以如果我有这个:
#Skipping create_engine and metadata
Base = declarative_base()
class Measure(Base):
__tablename__ = 'measures'
__table_args__ = {
postgresql_partition_by: 'RANGE (log_date)'
}
city_id = Column(Integer, not_null=True)
log_date = Columne(Date, not_null=True)
peaktemp = Column(Integer)
unitsales = Column(Integer)
class Measure2020(Base):
"""How am I suppposed to declare this ? """
我知道我将要做的大部分事情 SELECT * FROM measures WHERE logdate between XX and YY
。但这似乎很有趣。
您可以使用 MeasureMixin
两者都可以继承的 类。
然后使用 event
附加 table 分区。
from sqlalchemy import event
class MeasureMixin:
city_id = Column(Integer, not_null=True)
log_date = Column(Date, not_null=True)
peaktemp = Column(Integer)
unitsales = Column(Integer)
class Measure(MeasureMixin, Base):
__tablename__ = 'measures'
__table_args__ = {
postgresql_partition_by: 'RANGE (log_date)'
}
class Measure2020(MeasureMixin, Base):
__tablename__ = 'measures2020'
Measure2020.__table__.add_is_dependent_on(Measure.__table__)
event.listen(
Measure2020.__table__,
"after_create",
DDL("""ALTER TABLE measures ATTACH PARTITION measures2020
VALUES FROM ('2020-01-01') TO ('2021-01-01');""")
)
我遇到了类似的问题。我发现@moshevi 的回答非常有用,并最终对其进行了一些概括(因为我有很多 table 要分区)。
首先,创建一个这样的元类:
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.ddl import DDL
from sqlalchemy import event
class PartitionByYearMeta(DeclarativeMeta):
def __new__(cls, clsname, bases, attrs, *, partition_by):
@classmethod
def get_partition_name(cls_, key):
# 'measures' -> 'measures_2020' (customise as needed)
return f'{cls_.__tablename__}_{key}'
@classmethod
def create_partition(cls_, key):
if key not in cls_.partitions:
Partition = type(
f'{clsname}{key}', # Class name, only used internally
bases,
{'__tablename__': cls_.get_partition_name(key)}
)
Partition.__table__.add_is_dependent_on(cls_.__table__)
event.listen(
Partition.__table__,
'after_create',
DDL(
# For non-year ranges, modify the FROM and TO below
f"""
ALTER TABLE {cls_.__tablename__}
ATTACH PARTITION {Partition.__tablename__}
FOR VALUES FROM ('{key}-01-01') TO ('{key+1}-01-01');
"""
)
)
cls_.partitions[key] = Partition
return cls_.partitions[key]
attrs.update(
{
# For non-RANGE partitions, modify the `postgresql_partition_by` key below
'__table_args__': attrs.get('__table_args__', ())
+ (dict(postgresql_partition_by=f'RANGE({partition_by})'),),
'partitions': {},
'partitioned_by': partition_by,
'get_partition_name': get_partition_name,
'create_partition': create_partition
}
)
return super().__new__(cls, clsname, bases, attrs)
接下来,对于模型中要分割的任何 table:
class MeasureMixin:
# The columns need to be pulled out into this mixin
# Note: any foreign key columns will need to be wrapped like this:
@declared_attr
def city_id(self):
return Column(ForeignKey('cities.id'), not_null=True)
log_date = Column(Date, not_null=True)
peaktemp = Column(Integer)
unitsales = Column(Integer)
class Measure(MeasureMixin, Base, metaclass=PartitionByYearMeta, partition_by='logdate'):
__tablename__ = 'measures'
这样可以轻松添加更多 table 并按任意数量的值进行分区。
动态创建新分区的方式如下:
# Make sure you commit any session that is currently open, even for select queries:
session.commit()
Partition = Measure.create_partition(2020)
if not engine.dialect.has_table(Partition.__table__.name):
Partition.__table__.create(bind=engine)
现在键 2020
的分区已创建,可以插入该年份的值。
对于数据库分区(PostgreSQL 或 MySQL),您可以尝试使用 architect 包。它适用于一系列 ORM 库,包括 SQLAlchemy。这是 PostgreSQL 的示例 - https://architect.readthedocs.io/features/partition/postgresql.html。支持多种分区类型,希望能满足您的需求。
可能有点晚了,但我想分享我在@moshevi 和@Seb 的回答基础上所做的工作:
在我的物联网用例中,我需要实际的 sub 分区(第一级 year
,第二级 nodeid
)。我也想稍微概括一下。
这是我想出的:
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.ddl import DDL
from sqlalchemy import event
class PartitionByMeta(DeclarativeMeta):
def __new__(cls, clsname, bases, attrs, *, partition_by, partition_type):
@classmethod
def get_partition_name(cls_, suffix):
return f'{cls_.__tablename__}_{suffix}'
@classmethod
def create_partition(cls_, suffix, partition_stmt, subpartition_by=None, subpartition_type=None):
if suffix not in cls_.partitions:
partition = PartitionByMeta(
f'{clsname}{suffix}',
bases,
{'__tablename__': cls_.get_partition_name(suffix)},
partition_type = subpartition_type,
partition_by=subpartition_by,
)
partition.__table__.add_is_dependent_on(cls_.__table__)
event.listen(
partition.__table__,
'after_create',
DDL(
# For non-year ranges, modify the FROM and TO below
# LIST: IN ('first', 'second');
# RANGE: FROM ('{key}-01-01') TO ('{key+1}-01-01')
f"""
ALTER TABLE {cls_.__tablename__}
ATTACH PARTITION {partition.__tablename__}
{partition_stmt};
"""
)
)
cls_.partitions[suffix] = partition
return cls_.partitions[suffix]
if partition_by is not None:
attrs.update(
{
'__table_args__': attrs.get('__table_args__', ())
+ (dict(postgresql_partition_by=f'{partition_type.upper()}({partition_by})'),),
'partitions': {},
'partitioned_by': partition_by,
'get_partition_name': get_partition_name,
'create_partition': create_partition
}
)
return super().__new__(cls, clsname, bases, attrs)
将按如下方式使用,假设按照@moshevi
介绍的方式创建相应的VehicleDataMixin
class
class VehicleData(VehicleDataMixin, Project, metaclass=PartitionByMeta, partition_by='timestamp',partition_type='RANGE'):
__tablename__ = 'vehicle_data'
__table_args__ = (
Index('ts_ch_nod_idx', "timestamp", "nodeid", "channelid", postgresql_using='brin'),
UniqueConstraint('timestamp','nodeid','channelid', name='ts_ch_nod_constr')
)
然后可以像这样迭代地对其进行子分区(待改编)
for y in range(2017, 2021):
# Creating tables for all known nodeids
tbl_vehid_y = VehicleData.create_partition(
f"{y}", partition_stmt=f"""FOR VALUES FROM ('{y}-01-01') TO ('{y+1}-01-01')""",
subpartition_by='nodeid', subpartition_type='LIST'
)
for i in {3, 4, 7, 9}:
# Creating all the years below these nodeids including a default partition
tbl_vehid_y.create_partition(
f"nid{i}", partition_stmt=f"""FOR VALUES IN ('{i}')"""
)
# Defaults (nodeid) per year partition
tbl_vehid_y.create_partition("def", partition_stmt="DEFAULT")
# Default to any other year than anticipated
VehicleData.create_partition("def", partition_stmt="DEFAULT")
partition_by='timestamp'
<= 这是要分区的列
partition_type='RANGE'
<= 这是(PSQL 特定的)分区类型
partition_stmt=f"""FOR VALUES IN ('{i}')"""
<= 这是(特定于 PSQL 的)分区语句。