Nodejs ts:事件源和 cqrs,事件总线
Nodejs ts: event-sourcing and cqrs, event bus
你好,我有一个命令总线,一个查询总线,它基本上有一个带有命令或查询名称的密钥对和处理程序,然后我执行应该发布我的事件的命令。
但是我对如何做我的事件总线有一些疑问。
命令总线是事件总线的一部分吗?
我怎么能用处理程序做一个事件总线
命令总线:
export interface ICommand {
}
export interface ICommandHandler<
TCommand extends ICommand = any,
TResult = any
> {
execute(command: TCommand): Promise<TResult>
}
export interface ICommandBus<CommandBase extends ICommand = ICommand> {
execute<T extends CommandBase>(command: T): Promise<any>
register(data:{commandHandler: ICommandHandler, command: ICommand}[]): void
}
命令总线实现:
export class CommandBus<Command extends ICommand = ICommand>
implements ICommandBus<Command> {
private handlers = new Map<string, ICommandHandler<Command>>()
public execute<T extends Command>(command: T): Promise<any> {
const commandName = this.getCommandName(command as any)
const handler = this.handlers.get(commandName)
if (!handler) throw new Error(``)
return handler.execute(command)
}
public register(
data: { commandHandler: ICommandHandler; command: ICommand }[],
): void {
data.forEach(({command,commandHandler}) => {
this.bind(commandHandler, this.getCommandName(command as any))
})
}
private bind<T extends Command>(handler: ICommandHandler<T>, name: string) {
this.handlers.set(name, handler)
}
private getCommandName(command: Function): string {
const { constructor } = Object.getPrototypeOf(command)
return constructor.name as string
}
}
这里出现了另一个问题,谁应该负责在我的事件数据库中发布事件或读取我的事件数据库流是我的 class 事件存储?
事件存储class:
export class EventStoreClient {
[x: string]: any;
/**
* @constructor
*/
constructor(private readonly config: TCPConfig) {
this.type = 'event-store';
this.eventFactory = new EventFactory();
this.connect();
}
connect() {
this.client = new TCPClient(this.config);
return this;
}
getClient() {
return this.client;
}
newEvent(name: any, payload: any) {
return this.eventFactory.newEvent(name, payload);
}
close() {
this.client.close();
return this;
}
}
然后我对如何使用我的事件处理程序和我的事件来实现我的事件总线有疑问。
如果有人能帮助我,我会很高兴..
事件接口:
export interface IEvent {
readonly aggregrateVersion: number
readonly aggregateId: string
}
export interface IEventHandler<T extends IEvent = any> {
handle(event: T): any
}
可能用法:
commandBus.execute(new Command())
class commandHandler {
constructor(repository: IRepository, eventBus ????){}
execute(){
//how i can publish an event with after command handler logic with event bus her
}
}
我发现各种总线和事件存储之间存在一些混淆。在尝试实施事件总线之前,您需要回答一个重要的问题,该问题是任何事件溯源实施的基础:
- 如何将事件存储保存为唯一的真实来源?
也就是说,您的事件存储包含域的完整状态。这也意味着事件总线的消费者(无论它最终是什么——消息队列、流媒体平台、Redis 等)应该只获取持久化的事件。因此,目标变为:
- 仅在总线上传递持久保存到商店的事件(因此,如果您在写入商店时遇到错误,或者可能出现并发异常,请不要通过总线传递!)
- 将所有事件传送给所有感兴趣的消费者,而不会丢失任何事件
这两个目标直观地转化为“我想要在事件存储和事件总线之间进行原子提交”。当它们是同一件事时,这是最容易实现的!
因此,与其考虑如何将“事件总线”连接到命令处理程序并来回发送事件,不如考虑如何从事件中检索已经保存的 事件存储并订阅它。这也消除了命令处理程序和事件订阅者之间的任何依赖性——它们位于事件存储的不同端(编写器与 reader),并且可能在不同的进程中,在不同的机器上。
你好,我有一个命令总线,一个查询总线,它基本上有一个带有命令或查询名称的密钥对和处理程序,然后我执行应该发布我的事件的命令。 但是我对如何做我的事件总线有一些疑问。 命令总线是事件总线的一部分吗? 我怎么能用处理程序做一个事件总线
命令总线:
export interface ICommand {
}
export interface ICommandHandler<
TCommand extends ICommand = any,
TResult = any
> {
execute(command: TCommand): Promise<TResult>
}
export interface ICommandBus<CommandBase extends ICommand = ICommand> {
execute<T extends CommandBase>(command: T): Promise<any>
register(data:{commandHandler: ICommandHandler, command: ICommand}[]): void
}
命令总线实现:
export class CommandBus<Command extends ICommand = ICommand>
implements ICommandBus<Command> {
private handlers = new Map<string, ICommandHandler<Command>>()
public execute<T extends Command>(command: T): Promise<any> {
const commandName = this.getCommandName(command as any)
const handler = this.handlers.get(commandName)
if (!handler) throw new Error(``)
return handler.execute(command)
}
public register(
data: { commandHandler: ICommandHandler; command: ICommand }[],
): void {
data.forEach(({command,commandHandler}) => {
this.bind(commandHandler, this.getCommandName(command as any))
})
}
private bind<T extends Command>(handler: ICommandHandler<T>, name: string) {
this.handlers.set(name, handler)
}
private getCommandName(command: Function): string {
const { constructor } = Object.getPrototypeOf(command)
return constructor.name as string
}
}
这里出现了另一个问题,谁应该负责在我的事件数据库中发布事件或读取我的事件数据库流是我的 class 事件存储?
事件存储class:
export class EventStoreClient {
[x: string]: any;
/**
* @constructor
*/
constructor(private readonly config: TCPConfig) {
this.type = 'event-store';
this.eventFactory = new EventFactory();
this.connect();
}
connect() {
this.client = new TCPClient(this.config);
return this;
}
getClient() {
return this.client;
}
newEvent(name: any, payload: any) {
return this.eventFactory.newEvent(name, payload);
}
close() {
this.client.close();
return this;
}
}
然后我对如何使用我的事件处理程序和我的事件来实现我的事件总线有疑问。
如果有人能帮助我,我会很高兴..
事件接口:
export interface IEvent {
readonly aggregrateVersion: number
readonly aggregateId: string
}
export interface IEventHandler<T extends IEvent = any> {
handle(event: T): any
}
可能用法:
commandBus.execute(new Command())
class commandHandler {
constructor(repository: IRepository, eventBus ????){}
execute(){
//how i can publish an event with after command handler logic with event bus her
}
}
我发现各种总线和事件存储之间存在一些混淆。在尝试实施事件总线之前,您需要回答一个重要的问题,该问题是任何事件溯源实施的基础:
- 如何将事件存储保存为唯一的真实来源?
也就是说,您的事件存储包含域的完整状态。这也意味着事件总线的消费者(无论它最终是什么——消息队列、流媒体平台、Redis 等)应该只获取持久化的事件。因此,目标变为:
- 仅在总线上传递持久保存到商店的事件(因此,如果您在写入商店时遇到错误,或者可能出现并发异常,请不要通过总线传递!)
- 将所有事件传送给所有感兴趣的消费者,而不会丢失任何事件
这两个目标直观地转化为“我想要在事件存储和事件总线之间进行原子提交”。当它们是同一件事时,这是最容易实现的!
因此,与其考虑如何将“事件总线”连接到命令处理程序并来回发送事件,不如考虑如何从事件中检索已经保存的 事件存储并订阅它。这也消除了命令处理程序和事件订阅者之间的任何依赖性——它们位于事件存储的不同端(编写器与 reader),并且可能在不同的进程中,在不同的机器上。