Python 的 DDD:我做对了吗?
DDD with Python: did I get it right?
我正在尝试在 Python 项目中使用 域驱动设计 (DDD),但它看起来像很多样板代码。我想我走错了路。
我有三个文件,都为每个用途定义了项目。感觉太多了。此外,我经常转换字典,但我试图将目的分开。
这个主题不应该是基于意见的,因为我正在尝试遵循 DDD 方法并且应该有一个可遵循的模式。
下面代码的相关部分。请仔细查看 ItemRepository
.
/domain/item.py
"""
Vanilla Python class, business level
"""
class ItemDomain:
def __init__(self, name):
self.name = name
@classmethod
def from_dictionary(cls, dictionary):
return cls(name=dictionary['name'])
def to_dictionary(self):
return {'name': self.name }
/model/item.py
"""
Persistent model for SQLAlchemy
"""
class ItemModel(DefaultModel):
__tablename__ = 'items'
name = Column(Text)
/schema/item.py
"""
Schema for Marshmallow
"""
class ItemSchema(Schema):
name = fields.Str(required=True)
/repository/item.py
class ItemRepository:
def get_one(item_id):
# ...
model = session.query(ItemModel).filter_by(item_id=item_id).first()
return ItemDomain.from_dictionary(dict(model))
def add_one(item: ItemDomain):
# ...
item = item.to_dictionary()
ItemSchema().load(item) # validation: will raise an exception if invalid
model = ItemModel()
model.from_dictionary(item)
session.add(model)
# ...
我怎样才能拥有一个没有开销的干净架构?
为了回答您的问题,我开了一个博客 post,您可以在此处找到该博客:https://lukeonpython.blog/2020/04/my-structure-for-ddd-component/。
目前你只有代码片段,稍后我会添加一些描述:-)。
但一般来说,DDD 应该是一个独立的组件,具有用于纯数据对象进行通信的外观。这个外观是一个应用程序服务。在我的例子中,它是命令处理程序和查询处理程序。大多数测试都是使用外观的 BDD 测试。有时领域逻辑复杂,可以在aggregate/UnitOfWork上使用单元测试。
您的应用程序架构将 DDD 元素拆分为不同的包,我不喜欢。使用这种方法,您将失去对组件边界的控制。您需要从该组件获得的所有内容都应导出到 init.py。
通常,它是一个带有命令的命令处理程序。如果您需要数据视图,请查询处理程序。事件侦听器注册可能的事件。
如果您不确定是否需要所有这些东西,您可以从外观上的 BDD 测试和非常简化的内部实现开始。因此,具有直接使用 DTO 的业务逻辑的命令处理程序。以后如果事情变得复杂,您可以轻松重构。但适当的界限是成功的关键。另外,请记住,如果您觉得所有这些元素和代码都是开销,那么您可能不需要 DDD 方法。也许它不符合 DDD 的条件。
这里是一个带有组件包结构代码片段的小示例。我使用这样的东西:
- 迁移/
- app.py
- commands.py
- events.py
- exceptions.py
- repository.py
- service.py
- uow.py
在迁移中,我更喜欢为这个特定组件使用带分支的 alembic。所以不会对项目中的其他组件产生依赖。
app.py 是容器依赖注入的地方。它主要用于向应用程序服务和存储库依赖项注入适当的存储库。
对于其余的模块,我将在此处提供一些片段。
commands.py
@dataclass
class Create(Command):
command_id: CommandID = field(default_factory=uuid1)
timestamp: datetime = field(default_factory=datetime.utcnow
service.py
class CommandHandler:
def __init__(self, repository: Repository) -> None:
self._repository = repository
self._listeners: List[Listener] = []
super().__init__()
def register(self, listener: Listener) -> None:
if listener not in self._listeners:
self._listeners.append(listener)
def unregister(self, listener: Listener) -> None:
if listener in self._listeners:
self._listeners.remove(listener)
@safe
@singledispatchmethod
def handle(self, command: Command) -> Optional[Event]:
uow: UnitOfWork = self._repository.get(command.uow_id)
event: Event = app_event(self._handle(command, uow), command)
for listener in self._listeners:
listener(event)
self._repository.save(uow)
return event
@safe
@handle.register(Create)
def create(self, command: Create) -> Event:
uow = UnitOfWork.create()
self._repository.save(uow)
return Created(command.command_id, uow.id)
@singledispatchmethod
def _handle(self, c: Command, u: UnitOfWork) -> UnitOfWork.Event:
raise NotImplementedError
@_handle.register(UpdateValue)
def _(self, command: UpdateValue, uow: UnitOfWork) -> UnitOfWork.Event:
return uow.update(command.value)
uow.py
UnitOfWorkID = NewType('UnitOfWorkID', UUID)
class UnitOfWorkDTO:
id: UnitOfWorkID
value: Optional[Text]
class UnitOfWork:
id: UnitOfWorkID
dto: UnitOfWorkDTO
class Event:
pass
class Updated(Event):
pass
def __init__(self, dto: UnitOfWorkDTO) -> None:
self.id = dto.id
self.dto = dto
@classmethod
def create(cls) -> 'UnitOfWork':
dto = UnitOfWorkDTO()
dto.id = UnitOfWorkID(uuid1())
dto.value = None
return UnitOfWork(dto)
def update(self, value: Text) -> Updated:
self.dto.value = value
return self.Updated()
repository.py
class ORMRepository(Repository):
def __init__(self, session: Session):
self._session = session
self._query = self._session.query(UnitOfWorkMapper)
def get(self, uow_id: UnitOfWorkID) -> UnitOfWork:
dto = self._query.filter_by(uuid=uow_id).one_or_none()
if not dto:
raise NotFound(uow_id)
return UnitOfWork(dto)
def save(self, uow: UnitOfWork) -> None:
self._session.add(uow.dto)
self._session.flush()
entities_t = Table = Table(
'entities',
meta,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('uuid', String, unique=True, index=True),
Column('value', String, nullable=True),
)
UnitOfWorkMapper = mapper(
UnitOfWorkDTO,
entities_t,
properties={
'id': entities_t.c.uuid,
'value': entities_t.c.value,
},
column_prefix='_db_column_',
)
https://lukeonpython.blog/2020/04/my-structure-for-ddd-component/
您可以在此处找到此示例的完整来源 https://github.com/lzukowski/lzukowski.github.io/tree/master/examples/ddd_component
我正在尝试在 Python 项目中使用 域驱动设计 (DDD),但它看起来像很多样板代码。我想我走错了路。
我有三个文件,都为每个用途定义了项目。感觉太多了。此外,我经常转换字典,但我试图将目的分开。
这个主题不应该是基于意见的,因为我正在尝试遵循 DDD 方法并且应该有一个可遵循的模式。
下面代码的相关部分。请仔细查看 ItemRepository
.
/domain/item.py
"""
Vanilla Python class, business level
"""
class ItemDomain:
def __init__(self, name):
self.name = name
@classmethod
def from_dictionary(cls, dictionary):
return cls(name=dictionary['name'])
def to_dictionary(self):
return {'name': self.name }
/model/item.py
"""
Persistent model for SQLAlchemy
"""
class ItemModel(DefaultModel):
__tablename__ = 'items'
name = Column(Text)
/schema/item.py
"""
Schema for Marshmallow
"""
class ItemSchema(Schema):
name = fields.Str(required=True)
/repository/item.py
class ItemRepository:
def get_one(item_id):
# ...
model = session.query(ItemModel).filter_by(item_id=item_id).first()
return ItemDomain.from_dictionary(dict(model))
def add_one(item: ItemDomain):
# ...
item = item.to_dictionary()
ItemSchema().load(item) # validation: will raise an exception if invalid
model = ItemModel()
model.from_dictionary(item)
session.add(model)
# ...
我怎样才能拥有一个没有开销的干净架构?
为了回答您的问题,我开了一个博客 post,您可以在此处找到该博客:https://lukeonpython.blog/2020/04/my-structure-for-ddd-component/。 目前你只有代码片段,稍后我会添加一些描述:-)。
但一般来说,DDD 应该是一个独立的组件,具有用于纯数据对象进行通信的外观。这个外观是一个应用程序服务。在我的例子中,它是命令处理程序和查询处理程序。大多数测试都是使用外观的 BDD 测试。有时领域逻辑复杂,可以在aggregate/UnitOfWork上使用单元测试。 您的应用程序架构将 DDD 元素拆分为不同的包,我不喜欢。使用这种方法,您将失去对组件边界的控制。您需要从该组件获得的所有内容都应导出到 init.py。 通常,它是一个带有命令的命令处理程序。如果您需要数据视图,请查询处理程序。事件侦听器注册可能的事件。
如果您不确定是否需要所有这些东西,您可以从外观上的 BDD 测试和非常简化的内部实现开始。因此,具有直接使用 DTO 的业务逻辑的命令处理程序。以后如果事情变得复杂,您可以轻松重构。但适当的界限是成功的关键。另外,请记住,如果您觉得所有这些元素和代码都是开销,那么您可能不需要 DDD 方法。也许它不符合 DDD 的条件。
这里是一个带有组件包结构代码片段的小示例。我使用这样的东西:
- 迁移/
- app.py
- commands.py
- events.py
- exceptions.py
- repository.py
- service.py
- uow.py
在迁移中,我更喜欢为这个特定组件使用带分支的 alembic。所以不会对项目中的其他组件产生依赖。
app.py 是容器依赖注入的地方。它主要用于向应用程序服务和存储库依赖项注入适当的存储库。
对于其余的模块,我将在此处提供一些片段。
commands.py
@dataclass
class Create(Command):
command_id: CommandID = field(default_factory=uuid1)
timestamp: datetime = field(default_factory=datetime.utcnow
service.py
class CommandHandler:
def __init__(self, repository: Repository) -> None:
self._repository = repository
self._listeners: List[Listener] = []
super().__init__()
def register(self, listener: Listener) -> None:
if listener not in self._listeners:
self._listeners.append(listener)
def unregister(self, listener: Listener) -> None:
if listener in self._listeners:
self._listeners.remove(listener)
@safe
@singledispatchmethod
def handle(self, command: Command) -> Optional[Event]:
uow: UnitOfWork = self._repository.get(command.uow_id)
event: Event = app_event(self._handle(command, uow), command)
for listener in self._listeners:
listener(event)
self._repository.save(uow)
return event
@safe
@handle.register(Create)
def create(self, command: Create) -> Event:
uow = UnitOfWork.create()
self._repository.save(uow)
return Created(command.command_id, uow.id)
@singledispatchmethod
def _handle(self, c: Command, u: UnitOfWork) -> UnitOfWork.Event:
raise NotImplementedError
@_handle.register(UpdateValue)
def _(self, command: UpdateValue, uow: UnitOfWork) -> UnitOfWork.Event:
return uow.update(command.value)
uow.py
UnitOfWorkID = NewType('UnitOfWorkID', UUID)
class UnitOfWorkDTO:
id: UnitOfWorkID
value: Optional[Text]
class UnitOfWork:
id: UnitOfWorkID
dto: UnitOfWorkDTO
class Event:
pass
class Updated(Event):
pass
def __init__(self, dto: UnitOfWorkDTO) -> None:
self.id = dto.id
self.dto = dto
@classmethod
def create(cls) -> 'UnitOfWork':
dto = UnitOfWorkDTO()
dto.id = UnitOfWorkID(uuid1())
dto.value = None
return UnitOfWork(dto)
def update(self, value: Text) -> Updated:
self.dto.value = value
return self.Updated()
repository.py
class ORMRepository(Repository):
def __init__(self, session: Session):
self._session = session
self._query = self._session.query(UnitOfWorkMapper)
def get(self, uow_id: UnitOfWorkID) -> UnitOfWork:
dto = self._query.filter_by(uuid=uow_id).one_or_none()
if not dto:
raise NotFound(uow_id)
return UnitOfWork(dto)
def save(self, uow: UnitOfWork) -> None:
self._session.add(uow.dto)
self._session.flush()
entities_t = Table = Table(
'entities',
meta,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('uuid', String, unique=True, index=True),
Column('value', String, nullable=True),
)
UnitOfWorkMapper = mapper(
UnitOfWorkDTO,
entities_t,
properties={
'id': entities_t.c.uuid,
'value': entities_t.c.value,
},
column_prefix='_db_column_',
)
https://lukeonpython.blog/2020/04/my-structure-for-ddd-component/
您可以在此处找到此示例的完整来源 https://github.com/lzukowski/lzukowski.github.io/tree/master/examples/ddd_component