用于流式数据的 CQRS

CQRS for streamed data

我正在使用 CQRS 隔离并且非常适合从一个节点到远程节点的事务性命令或请求-响应。

我有一个用例,其中将向远程节点发出命令,这将导致 "stream" 数据(很像远程命令 运行,并且服务器给我们随着文本的进展更新文本):

// this is sent from requesting node to remote node to initiate the stream
public class LongRunningCommand: ICommand 
{
    Guid Session { get; set; } // the session ID to use
    string CommandLine {get; set; } // the command the remote note will run
}

此数据随后在一段时间内以多个数据包的形式从远程节点发送到请求节点:

// this is sent from remote node to requestor in multiple updates over time
public class UpdateProgress: ICommand
{
    Guid Session { get; set; } // possibility to multiplex sessions
    int Sequence { get; set; } // de-dupe/resequencing out of order packets (lower QOS)
    byte[] Payload { get; set; } // the data to be passed to the application
}

这不是真正的命令,也不是请求-回复(因为有多个回复)- 它是一个很长的 运行 会话,但我不确定这与CQRS.

最好的订购方式是什么?我的请求节点是否可以具有如下所示的命令处理程序(其中 UpdateProgress 是正在处理的 "command"):

public class UpdateProgressCommandHandler : ICommandHandler<UpdateProgress>
{
   public async Task HandleAsync(UpdateProgress message)
   {
      // resequence in handler or chained infrastructure - omitted for brevity
      var window = GetWindowForSession(message.Session);
      var updateFromServer = System.Text.Encoding.UTF8.GetString(message.Payload);
      await window.WriteLine(updateFromServer);
   }
}

以上内容有效(我认为还不错),但术语似乎有点古怪(命令名称 UpdateProgress 与其说是命令,不如说是事件)。

或者我是否最好放弃 commands/queries 的概念,并使用完整的事件总线,如果我这样做了,我将如何处理初始请求,因为这不是一个事件,它更像是一个命令(这在处理事件的事件总线上语义上没有意义 - 而不是命令或查询)。

或者我只是陷入了命名约定?由于第一次这样做,感谢上述用例的最佳实践视图。

我不确定我是否理解正确,需要与远程节点通信的 Command 似乎不属于您的域。这并不是说它不是 Command,您仍然可以将其定义为 Command,但不能在您的 Domain IMO 中定义。您可以在此处查看集成事件。

在不完全了解您的领域的情况下,您可以通过以下方式定义流程:

  • 执行命令修改您的域(类似于Status: Pending

  • 将集成事件从您的 CommandHandler 引发到单独的工作程序/服务总线中

  • 单独的worker完成流程,然后引发另一个集成事件

  • 您的工作人员订阅该事件并更新您域的相关部分(例如 Status: Completed)。