同一个应用能不能同时publisher/subscriber和NServiceBus?

Can the same application be both publisher/subscriber with NServiceBus?

我是消息传递架构方面的新手,所以我可能走错了路。但是我想通过解决一个小问题在我的团队中慢慢引入 NServiceBus。

议程中的约会有状态。两个用户可能正在同一应用程序的同一议程中查看同一约会。他们通过中央服务器上的远程会话启动此应用程序。因此,如果用户 1 更新约会状态,我希望用户 2 看到新状态 'real time'。

为了对此进行模拟或进行概念验证(如果您愿意),我制作了一个新的控制台应用程序。通过 NuGet 我得到了 NServiceBus 和 NServiceBus.Host,因为我从文档中了解到我需要两者。而且我知道在生产代码中不建议将所有内容都放在同一个程序集中,但是发布者和订阅者很可能最终会在同一个程序集中...

在class程序方法Main中我写了如下代码:

BusConfiguration configuration = new BusConfiguration();

configuration.UsePersistence<InMemoryPersistence>();
configuration.UseSerialization<XmlSerializer>();
configuration.UseTransport<MsmqTransport>();
configuration.TimeToWaitBeforeTriggeringCriticalErrorOnTimeoutOutages(new TimeSpan(1, 0, 0));
ConventionsBuilder conventions = configuration.Conventions();
conventions.DefiningEventsAs(t => t.Namespace != null
    && t.Namespace.Contains("Events"));

using (IStartableBus bus = Bus.Create(configuration))
{
    bus.Start();

    Console.WriteLine("Press key");
    Console.ReadKey();

    bus.Publish<Events.AppointmentStateChanged>(a =>
    {
        a.AppointmentID = 1;
        a.NewState = "New state";
    });

    Console.WriteLine("Event published.");
    Console.ReadKey();
}

在classEndPointConfig方法自定义我添加了:

configuration.UsePersistence<InMemoryPersistence>();
configuration.UseSerialization<XmlSerializer>();
configuration.UseTransport<MsmqTransport>();
ConventionsBuilder conventions = configuration.Conventions();
conventions.DefiningEventsAs(t => t.Namespace != null
    && t.Namespace.Contains("Events"));

AppointmentStateChanged 是事件文件夹中的一个简单 class,如下所示:

public class AppointmentStateChanged: IEvent {
    public int AppointmentID { get; set; }
    public string NewState { get; set; }
}

AppointmentStateChangedHandler 是事件处理程序:

public class AppointmentStateChangedHandler : IHandleMessages<Events.AppointmentStateChanged> {
public void Handle(Events.AppointmentStateChanged message) {
        Console.WriteLine("AppointmentID: {0}, changed to state: {1}", 
            message.AppointmentID, 
            message.NewState);
    }
}

如果我启动一个控制台应用程序,一切正常。我看到处理程序处理事件。但是,如果我尝试启动第二个控制台应用程序,它会崩溃:System.Messaging.MessageQueueException(请求操作的超时已过期)。所以我一定是做错了什么,让我再次猜测我不理解更高层次的东西。谁能给我指出正确的方向?

更新 除了 AgendaUpdates.Events 命名空间中的事件 class 之外,一切都在命名空间 AgendaUpdates 中。

更新 2 采取的步骤:

--> 我通过使用原始和复制的解决方案启动 2 visual studio 来测试它。然后在 IDE 中启动两个控制台应用程序。

我不太确定您为什么会收到 特定的 异常,但我可以解释您尝试执行的操作失败的原因。问题是在同一个应用程序中没有发布者和订阅者(​​这是可能的并且很有用);问题是您 运行 同一台机器上同一应用程序的两个实例。

NServiceBus 依赖于队列技术(在你的例子中是 MSMQ),为了让一切正常工作,每个应用程序都需要有自己独特的队列。当您启动两个相同的实例时,两者都试图共享同一个队列。

您可以修改一些内容来让您的场景正常工作并更好地理解排队的工作原理:

  1. 更改第二个实例的 EndPointName
  2. 运行 另一台机器上的第二个实例
  3. 将发布者和订阅者分离到不同的进程中

无论您采用哪种方式,您都需要调整 MessageEndpointMappings(在 consumer/subscriber 上)以反映 host/publisher 队列所在的位置(消息类型的 "owner" ):

http://docs.particular.net/nservicebus/messaging/message-owner#configuring-endpoint-mapping

根据您的更新进行编辑

我知道这是对概念的测试 setup/proof,但将这两个部署(相同代码)视为 publisher/host 和 subscriber/client 仍然有用。所以我们称原件为主机,副本为客户端。我假设您不希望每个人都订阅另一个人(至少对于这个基本测试)。

此外,请确保您 运行 在您的计算机上以管理员身份使用这两个 IDE。我不确定这是否干扰。

在副本中,我将 App.Config 端点属性中的 MessageEndpointMappings 更改为 "AgendaUpdates2" 我得到了 MSMQ 异常:"the queue does not exist or you do not have sufficient permissions to perform the operation"

由于副本是客户端,所以您希望将其映射指向主机。所以这应该是 "AgendaUpdates"(省略“2”)。

在副本中我将这行代码添加到 EndPointConfig:configuration.EndpointName("AgendaUpdates2");我收到 MSMQ 异常:"the queue does not exist or you do not have sufficient permissions to perform the operation"

在副本中我在程序class的Main方法中添加了这行代码:configuration.EndpointName("AgendaUpdates2");按下 key

后再次出现原始异常

我最初没有注意到这一点,但您不需要两次配置端点。我相信您的 EndPointConfig 没有被调用,因为它仅在通过 NSB 主机可执行文件托管时使用。您可能只需删除此 class.

这听起来很合理,但请记住,如果它是订阅者,您的副本不应该发布,所以在它开始后不要按任何键(只按原件中的键)。

如果您希望发布者同时也是消息的接收者,您需要在配置中指定。

这个在this article里面解释的很清楚,你的问题的解决方案在文章的最后。