Rebus Windsor 容器和内联消息处理程序

Rebus Windsor container and inlinemessage handlers

我正在尝试使用 Rebus 执行请求/回复,但也使用 Windsor 容器 NuGet 包:https://github.com/rebus-org/Rebus.CastleWindsor

查看以下示例后:

https://github.com/rebus-org/RebusSamples/tree/master/RequestReply

https://github.com/rebus-org/RebusSamples/tree/master/Integration

我拼凑了下面这个例子:

public class ContactModule : NancyModule
{
    public ContactModule(IBus bus)
    {
        Get["/v1/contact"] = parameters =>
        {
            var contacts = new List<Contact>();

            using (var activator = new BuiltinHandlerActivator())
            {
                var settings = new Settings();

                activator.Handle<GetContactsResponse>(response =>
                {
                    contacts = response.Contacts;

                    return Task.FromResult(0);
                });

                Configure.With(activator)
                    .Logging(l => l.ColoredConsole(LogLevel.Warn))
                    .Transport(t => t.UseRabbitMq(
                        settings.RabbitMQConnectionString,
                        settings.OutputQueueName)) // we listen for messages in the output queue
                    .Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) // but we publish to the input queue
                    .Options(o =>
                    {
                        o.EnableCompression();
                        o.EnableEncryption(settings.MessageEncryptionKey);
                    })
                    .Start();

                activator.Bus.Publish(new GetContactsRequest``()).Wait();
            }

            return Response.AsJson(contacts);
        };
    }
}

我知道使用 Windsor 容器方法时它可能看起来更像下面,但我不知道如何听回复:

public class ContactModule : NancyModule
{
    public ContactModule(IBus bus)
    {
        Get["/v1/contact"] = parameters =>
        {
            var contacts = new List<Contact>();

            bus.Publish(new GetContactsRequest()).Wait();

            // How do I listen for the reply?

            return Response.AsJson(contacts);
        };
    }
}

我的消息处理程序:

public class GetContactsHandler : IHandleMessages<GetContactsRequest>
{
    private readonly IBus _bus;
    private readonly Settings _settings;

    public GetContactsHandler(IBus bus, Settings settings)
    {
        _bus = bus;
        _settings = settings;
    }

    public async Task Handle(GetContactsRequest request)
    {
        // Fetch contacts from db...
        var contacts = new List<Contact>();
        await _bus.Reply(new GetContactsResponse {Contacts = contacts});
    }
} 

我的 Rebus Windsor 安装程序:

public class RebusInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        var settings = new Settings();

        Configure.With(new CastleWindsorContainerAdapter(container))
            .Logging(l => l.ColoredConsole(LogLevel.Warn))
            .Transport(t => t.UseRabbitMqAsOneWayClient(
                settings.RabbitMQConnectionString))
            .Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) 
            .Options(o =>
            {
                o.EnableCompression();
                o.EnableEncryption(settings.MessageEncryptionKey);
            })
            .Start();
    }
}

我面临的问题是我想在我的网站 api 中使用请求/回复模式来请求联系人列表,等待包含检索到的联系人的回复和 return他们给 api 来电者。

但是如果我想为 Rebus 使用 Windsor 容器适配器,IHandlerActivator 接口不会公开允许注册内联消息处理程序的 .Handle 方法,在该方法中我从回复中获取联系人,然后将它们发回给 api 来电者。

有没有办法做到这一点,还是我处理问题的方式不正确?

编辑:如您在第一个示例中所见,我从 Windsor 容器注入 IBus 接口。 但是,如果我使用注入总线,我该如何告诉它监听从消息处理程序返回的回复?

更新:Rebus.Async 正是我要找的:https://github.com/rebus-org/Rebus.Async

当您使用真正的 IoC 容器时,您应该创建一个实现 IHandleMessages<TMessage> 的 class,例如像这样:

public class GetContactsRequestHandler : IHandleMessages<GetContactsRequest>
{
    readonly IBus _bus;

    public GetContactsRequestHandler(IBus bus)
    {
        _bus = bus;
    }   

    public async Task Handle(GetContactsRequest request)
    {
        var reply = new GetContactsReply(...); 

        await _bus.Reply(reply);
    }
}

当您向您的程序添加处理程序和辅助应用程序和域服务时,它的扩展性会更好。

这当然意味着你必须引入 Rebus.CastleWindsor NuGet 包并传递一个 WindsorContainerAdapter 给 Rebus 的 .With(...) 方法。

当您需要在 Windsor 中注册处理程序时,您可以利用 Rebus.CastleWindsor 中的扩展方法,它允许执行以下操作:

// register all handler types from this assembly
windsorContainer.AutoRegisterHandlersFromThisAssembly();

// register all handler types from typeof(SomeHandler).Assembly
windsorContainer.AutoRegisterHandlersFromAssemblyOf<SomeHandler>();

// register one particular type as a message handler
windsorContainer.RegisterHandler<GetContactsRequestHandler>();

在您添加更多信息后更新 – 您说:

(...) or am I approaching the problem incorrectly?

我会说 "yes",因为服务总线不太擅长同步 request/reply – 你应该为此使用一些东西 "request/reply-oriented",例如HTTP(*).

此外 – 我对南希几乎一无所知,但我很确定

bus.Publish(...).Wait();

会出现死锁,至少如果您在 ASP.NET.

接待南希的话

你应该总是

await bus.Publish(...);

当你可以的时候,在南希你可以这样做:

Get["/v1/contact"] = async (parameters, ct) =>
{
    (...)

    await bus.Publish(...);

    (...)
};

PS:最后一件事:只要您希望端点处理消息,请记住保持容器在身边。

在您发布的代码中,您在发布事件后立即处理 BuiltinHandlerActivator,这意味着您很可能根本无法处理任何消息。

如果你使用Windsor,当你销毁Windsor容器时,总线也会被销毁。


(*) 虽然 Rebus 实际上有 Rebus.Async,这是一个 request/response API,可以在您的场景中使用。

不过我不建议走这条路。如果你打算这样做一次,你应该只这样做,然后再也不会:)