在 Node.js 中调度 CQRS 消息
Dispatching CQRS messages in Node.js
我想为 Node 应用程序做 CQRS。
我不是 Node 人员,我来自 .NET,它有一个名为 MediatR 的出色库,它可以将 commands/queries 分派给可用于分离请求和处理程序的中介。所以它允许非常简单和优雅的 CQRS。
在 Node 世界中,我发现了很多 libraries/blogs,但它们也总是包含事件溯源。我对 ES 不感兴趣。
我可以很好地为命令和查询建模,但是那又怎样呢?他们需要以解耦的方式被派往某个地方,以避免混乱。
据我目前对 Node 平台的了解,一个可能的解决方案是使用观察者模式(通过 RxJs 库),这样控制器就可以向观察者发送消息(即 CQRS 请求),然后观察者发布订阅者的相应事件(即请求处理程序)。这在类似 DDD 的设计中解耦了控制器和服务。虽然我不确定如何将结果传回控制器。
其他人也是这样吗?在 Node 中有更好的方法吗?
TL:DR :你不需要一些花哨的框架,特别是当你只进行进程内通信时,应用 CQRS 架构。原生EventEmitter
from the events
module is enough. If you want inter-process communication servicebus
does a really good job. To take a look at an implementation exemple (of the following long version answer) you can dive into the code of this repository : simple node cqrs
让我们以一个非常简单的聊天应用程序为例,如果聊天未关闭,您可以发送消息,并且 like/unlike 消息。
我们的主要聚合(或概念上的聚合根)是 Chat
(writeModel/domain/chat.js
):
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
然后,我们有一个 Message
聚合 (writeModel/domain/message.js
) :
const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
Object.freeze({
id,
chatId,
userId,
content,
sentAt,
messageLikes,
});
发送消息的行为可能是 (writeModel/domain/chat.js
) :
const invariant = require('invariant');
const { Message } = require('./message');
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
invariant(!chatState.isClosed, "can't post in a closed chat");
return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};
我们现在需要命令 (writeModel/domain/commands.js
) :
const commands = {
types: {
SEND_MESSAGE: '[chat] send a message',
},
sendMessage({ chatId, userId, content, sentAt }) {
return Object.freeze({
type: commands.types.SEND_MESSAGE,
payload: {
chatId,
userId,
content,
sentAt,
},
});
},
};
module.exports = {
commands,
};
因为我们在 javascript,我们没有 interface
来提供抽象,所以我们使用 higher order functions
(writeModel/domain/getChatOfId.js
) :
const { Chat } = require('./message');
const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
try {
const chatState = await getChatOfId(id);
if (typeof chatState === 'undefined') {
throw chatState;
}
return chatState;
} catch (e) {
throw new Error(`chat with id ${id} was not found`);
}
};
module.exports = {
getChatOfId,
};
(writeModel/domain/saveMessage.js
) :
const { Message } = require('./message');
const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;
module.exports = {
saveMessage,
};
我们现在需要实现我们的commandHandlers
(应用服务层):
(writeModel/commandHandlers/handleSendMessage.js
)
const { sendMessage } = require('../domain/chat');
const handleSendMessage = ({
getChatOfId,
getNextMessageId,
saveMessage,
}) => async sendMessageCommandPayload => {
const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
const chat = await getChatOfId(chatId);
return saveMessage(
sendMessage({
chatState: chat,
messageId: getNextMessageId(),
userId,
content,
sentAt,
}),
);
};
module.exports = {
handleSendMessage,
};
由于 javascript 中没有 interface
,我们使用 higher order functions
通过在运行时注入依赖项来应用依赖倒置原则。
然后我们可以实现写模型的组合根:(`writeModel/index.js) :
const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');
const SimpleNodeCQRSwriteModel = ({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
}) => {
handleCommand(
commands.types.SEND_MESSAGE,
handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
);
};
module.exports = {
SimpleNodeCQRSwriteModel,
};
你的 commands
和 command handler
没有绑定在一起,然后你可以在运行时提供这些功能的实现,例如内存数据库和节点 EventEmitter
( writeModel/infrastructure/inMemory/index.js
) :
const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');
const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
const listeners = [];
const db = {
...initialDbState,
};
const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);
const updateDb = updater => {
updater();
listeners.map(listener => listener(db));
};
const saveMessageInMemory = saveMessage(async messageState => {
updateDb(() => (db.messages[messageState.id] = messageState));
});
const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);
const getNextMessageUuid = getNextMessageId(uuid);
return {
addOnDbUpdatedListener,
saveMessage: saveMessageInMemory,
getChatOfId: getChatOfIdFromMemory,
getNextMessageId: getNextMessageUuid,
};
};
module.exports = {
InMemoryRepository,
};
我们的 TestWriteModel
将它们结合在一起 :
const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');
const TestWriteModel = () => {
const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
const commandEmitter = new EventEmitter();
const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
const handleCommand = (commandType, commandHandler) => {
commandEmitter.on(commandType, commandHandler);
};
return SimpleNodeCQRSwriteModel({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
});
};
您可以深入研究此存储库中的代码(使用非常简单的 read model
):simple node cqrs
我想为 Node 应用程序做 CQRS。
我不是 Node 人员,我来自 .NET,它有一个名为 MediatR 的出色库,它可以将 commands/queries 分派给可用于分离请求和处理程序的中介。所以它允许非常简单和优雅的 CQRS。
在 Node 世界中,我发现了很多 libraries/blogs,但它们也总是包含事件溯源。我对 ES 不感兴趣。
我可以很好地为命令和查询建模,但是那又怎样呢?他们需要以解耦的方式被派往某个地方,以避免混乱。
据我目前对 Node 平台的了解,一个可能的解决方案是使用观察者模式(通过 RxJs 库),这样控制器就可以向观察者发送消息(即 CQRS 请求),然后观察者发布订阅者的相应事件(即请求处理程序)。这在类似 DDD 的设计中解耦了控制器和服务。虽然我不确定如何将结果传回控制器。
其他人也是这样吗?在 Node 中有更好的方法吗?
TL:DR :你不需要一些花哨的框架,特别是当你只进行进程内通信时,应用 CQRS 架构。原生EventEmitter
from the events
module is enough. If you want inter-process communication servicebus
does a really good job. To take a look at an implementation exemple (of the following long version answer) you can dive into the code of this repository : simple node cqrs
让我们以一个非常简单的聊天应用程序为例,如果聊天未关闭,您可以发送消息,并且 like/unlike 消息。
我们的主要聚合(或概念上的聚合根)是 Chat
(writeModel/domain/chat.js
):
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
然后,我们有一个 Message
聚合 (writeModel/domain/message.js
) :
const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
Object.freeze({
id,
chatId,
userId,
content,
sentAt,
messageLikes,
});
发送消息的行为可能是 (writeModel/domain/chat.js
) :
const invariant = require('invariant');
const { Message } = require('./message');
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
invariant(!chatState.isClosed, "can't post in a closed chat");
return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};
我们现在需要命令 (writeModel/domain/commands.js
) :
const commands = {
types: {
SEND_MESSAGE: '[chat] send a message',
},
sendMessage({ chatId, userId, content, sentAt }) {
return Object.freeze({
type: commands.types.SEND_MESSAGE,
payload: {
chatId,
userId,
content,
sentAt,
},
});
},
};
module.exports = {
commands,
};
因为我们在 javascript,我们没有 interface
来提供抽象,所以我们使用 higher order functions
(writeModel/domain/getChatOfId.js
) :
const { Chat } = require('./message');
const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
try {
const chatState = await getChatOfId(id);
if (typeof chatState === 'undefined') {
throw chatState;
}
return chatState;
} catch (e) {
throw new Error(`chat with id ${id} was not found`);
}
};
module.exports = {
getChatOfId,
};
(writeModel/domain/saveMessage.js
) :
const { Message } = require('./message');
const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;
module.exports = {
saveMessage,
};
我们现在需要实现我们的commandHandlers
(应用服务层):
(writeModel/commandHandlers/handleSendMessage.js
)
const { sendMessage } = require('../domain/chat');
const handleSendMessage = ({
getChatOfId,
getNextMessageId,
saveMessage,
}) => async sendMessageCommandPayload => {
const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
const chat = await getChatOfId(chatId);
return saveMessage(
sendMessage({
chatState: chat,
messageId: getNextMessageId(),
userId,
content,
sentAt,
}),
);
};
module.exports = {
handleSendMessage,
};
由于 javascript 中没有 interface
,我们使用 higher order functions
通过在运行时注入依赖项来应用依赖倒置原则。
然后我们可以实现写模型的组合根:(`writeModel/index.js) :
const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');
const SimpleNodeCQRSwriteModel = ({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
}) => {
handleCommand(
commands.types.SEND_MESSAGE,
handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
);
};
module.exports = {
SimpleNodeCQRSwriteModel,
};
你的 commands
和 command handler
没有绑定在一起,然后你可以在运行时提供这些功能的实现,例如内存数据库和节点 EventEmitter
( writeModel/infrastructure/inMemory/index.js
) :
const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');
const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
const listeners = [];
const db = {
...initialDbState,
};
const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);
const updateDb = updater => {
updater();
listeners.map(listener => listener(db));
};
const saveMessageInMemory = saveMessage(async messageState => {
updateDb(() => (db.messages[messageState.id] = messageState));
});
const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);
const getNextMessageUuid = getNextMessageId(uuid);
return {
addOnDbUpdatedListener,
saveMessage: saveMessageInMemory,
getChatOfId: getChatOfIdFromMemory,
getNextMessageId: getNextMessageUuid,
};
};
module.exports = {
InMemoryRepository,
};
我们的 TestWriteModel
将它们结合在一起 :
const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');
const TestWriteModel = () => {
const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
const commandEmitter = new EventEmitter();
const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
const handleCommand = (commandType, commandHandler) => {
commandEmitter.on(commandType, commandHandler);
};
return SimpleNodeCQRSwriteModel({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
});
};
您可以深入研究此存储库中的代码(使用非常简单的 read model
):simple node cqrs