Rebus 新手问题

Rebus Newbie Questions

我一直在评估不同的 .Net 消息传递组件,我在使用 MSMQ 的 Rebus 方面遇到了一些问题,我无法通过文档、示例或 google.[=12 解决这些问题=]

我认为我的方案唯一独特之处在于我的客户端可以发送或发布消息。看起来应该很简单,但我遇到了以下问题。代码片段跟在问题之后。

问题:

  1. 未处理 MetersRequest。它们只是堆积在 outgoingMetersRequests 队列中。
  2. 收到一条错误消息,指出 creditAuthRequests 队列不存在或我没有权限。我已经确认队列没有被创建,但我的假设是 Rebus 将确保它在创建 outgoingMetersRequests 队列时创建。 (旁注:如果我从 Bus.Send(...) 中删除 'await',错误就会消失,但队列仍未创建,消息也无处可寻。)

===========

来自客户

    private void InitializeBus()
    {
         _messageActivator = new BuiltinHandlerActivator();
         Configure.With(_messageActivator)
            .Transport(t => t.UseMsmq("publisher"))
            .Routing(r => r.TypeBased().Map<MetersRequest>("outgoingMetersRequests")
            .Map<CreditAuthorizationRequest>("creditAuthRequests"))
            .Start();
    }

    private async Task SendCreditAuthRequestAsync(int numberToSend)
    {
         var cardNumber = generateCardNumber();
         await _messageActivator.Bus.Send(new CreditAuthorizationRequest(cardNumber));
         await WriteOutputAsync($"Sent credit auth request for card {cardNumber}.");
    }

    private async Task SendMetersRequestAsync(int numberToSend)
    {
         await _messageActivator.Bus.Send(new MetersRequest());
         await WriteOutputAsync("Sent meters request.");
    }

=========== 终端客户

============

来自服务

    private void InitializeBus()
    {
        _messageActivator = new BuiltinHandlerActivator();
        _messageActivator.Register<PosOnlineHandler>(() => new PosOnlineHandler(WriteOutputAsync));
        _messageActivator.Register<PumpDownHandler>(() => new PumpDownHandler(WriteOutputAsync));
        _messageActivator.Register<MetersRequestHandler>(() => new MetersRequestHandler(WriteOutputAsync, _messageActivator.Bus));
        _messageActivator.Register<CreditAuthorizationHandler>(() => new CreditAuthorizationHandler(WriteOutputAsync, _messageActivator.Bus));
        Configure.With(_messageActivator)
            .Transport(t => t.UseMsmq("subscriber1"))
            .Routing(r => r.TypeBased()
                .Map<PumpDownEvent>("publisher")
                .Map<PosOnlineEvent>("publisher")
                .Map<MetersRequest>("outgoingMetersRequests")
                .Map<CreditAuthorizationRequest>("creditAuthRequests"))
        .Start();
       _messageActivator.Bus.Subscribe<PumpDownEvent>().Wait();
       _messageActivator.Bus.Subscribe<PosOnlineEvent>().Wait();
    }

public class MetersRequestHandler : IHandleMessages<MetersRequest>
{
    private readonly Random _randomizer = new Random();
    private readonly Func<string, Task> _outputDelegate2;
    private readonly IBus _messageBus;

    public MetersRequestHandler(Func<string, Task> outputDelegate, IBus messageBus)
    {
        _outputDelegate2 = outputDelegate;
        _messageBus = messageBus;
    }

    public async Task Handle(MetersRequest message)
    {
        var pump = _randomizer.Next(20);
        var meters = _randomizer.Next();
        decimal dollars = (decimal)_randomizer.NextDouble();

        var response = new MetersResponse(pump, meters, dollars);
        await _outputDelegate2($"Sending MetersResponse: (Pump={pump}) (Meters={meters}) (Dollars ={dollars}");
        await _messageBus.Reply(response);
    }
}

============

您发布的代码中有几处似乎有点不对劲。我只会评论你的问题,然后我会提出前进的方向:)

MetersRequest not being handled. They are just piling up in outgoingMetersRequests queue.

当您调用 .Routing(t => t.TypeBased().Map<SomeMessage>("someQueue")) 时,您是说 SomeMessage 类型是 Rebus 端点的 "owned" 和输入队列 someQueue.

当你await bus.Send(yourMessage)时,Rebus会得到拥有yourMessage类型的队列,并向那里发送消息。这很好地解释了为什么您的 MetersRequest 会进入 outgoingMetersRequests 队列。

但是,您尚未发布任何代码来显示将 outgoingMetersRequests 队列作为其输入队列的端点。需要有人处理该队列中的消息才能发生某些事情。

"input queue" 是你在 .Transport(t => t.UseMsmq("publisher")) 部分配置的——在这种情况下,输入队列是 publisher.

Getting an error saying that creditAuthRequests queue doesn't exist or I don't have permissions.

是的 – MSMQ 错误消息通常不是最好的 ;)

I've confirmed the queue is not being created, but my assumption is that Rebus will ensure it's created just as the outgoingMetersRequests queue is being created.

Rebus(使用 MSMQ)仅创建输入队列和错误队列(默认称为 error)。这意味着您必须有一个端点 运行 使用 outgoingMetersRequests 作为其输入队列至少一次,或者您可能手动创建了队列?

(Side note: The error goes away if I remove the 'await' from Bus.Send(...), but the queue is still not created and the messages are no where to be found.)

错误并没有消失——异常只是被捕获并传递给一个在线程池线程上运行的延续,你永远看不到它,因为你放开了从 [=23] 返回的 Task =].

我建议你这样做:

  1. 为输入队列起个好名字——publishersubscriber太笼统了,你应该选择与每个端点的责任或存在于队列中的原因相对应的名称第一名.

  2. 注意您的消息类型。看来你是对的,因为你已经将事件命名为 *Event,而且你对 request/reply 的使用看起来也是正确的。

  3. 使用某种共享订阅存储,例如中央 SQL 服务器。这样——如果您在所有发布者和订阅者中配置相同的集中式订阅存储——您不需要为您的事件类型进行任何端点映射,这非常简洁。您可以在 the wiki page about subscription storages in the section about the centralized type.

  4. 上阅读更多内容