Rebus Windsor 容器和内联消息处理程序
Rebus Windsor container and inlinemessage handlers
我正在尝试使用 Rebus 执行请求/回复,但也使用 Windsor 容器 NuGet 包:https://github.com/rebus-org/Rebus.CastleWindsor
查看以下示例后:
https://github.com/rebus-org/RebusSamples/tree/master/RequestReply
https://github.com/rebus-org/RebusSamples/tree/master/Integration
我拼凑了下面这个例子:
public class ContactModule : NancyModule
{
public ContactModule(IBus bus)
{
Get["/v1/contact"] = parameters =>
{
var contacts = new List<Contact>();
using (var activator = new BuiltinHandlerActivator())
{
var settings = new Settings();
activator.Handle<GetContactsResponse>(response =>
{
contacts = response.Contacts;
return Task.FromResult(0);
});
Configure.With(activator)
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMq(
settings.RabbitMQConnectionString,
settings.OutputQueueName)) // we listen for messages in the output queue
.Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) // but we publish to the input queue
.Options(o =>
{
o.EnableCompression();
o.EnableEncryption(settings.MessageEncryptionKey);
})
.Start();
activator.Bus.Publish(new GetContactsRequest``()).Wait();
}
return Response.AsJson(contacts);
};
}
}
我知道使用 Windsor 容器方法时它可能看起来更像下面,但我不知道如何听回复:
public class ContactModule : NancyModule
{
public ContactModule(IBus bus)
{
Get["/v1/contact"] = parameters =>
{
var contacts = new List<Contact>();
bus.Publish(new GetContactsRequest()).Wait();
// How do I listen for the reply?
return Response.AsJson(contacts);
};
}
}
我的消息处理程序:
public class GetContactsHandler : IHandleMessages<GetContactsRequest>
{
private readonly IBus _bus;
private readonly Settings _settings;
public GetContactsHandler(IBus bus, Settings settings)
{
_bus = bus;
_settings = settings;
}
public async Task Handle(GetContactsRequest request)
{
// Fetch contacts from db...
var contacts = new List<Contact>();
await _bus.Reply(new GetContactsResponse {Contacts = contacts});
}
}
我的 Rebus Windsor 安装程序:
public class RebusInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
var settings = new Settings();
Configure.With(new CastleWindsorContainerAdapter(container))
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient(
settings.RabbitMQConnectionString))
.Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName))
.Options(o =>
{
o.EnableCompression();
o.EnableEncryption(settings.MessageEncryptionKey);
})
.Start();
}
}
我面临的问题是我想在我的网站 api 中使用请求/回复模式来请求联系人列表,等待包含检索到的联系人的回复和 return他们给 api 来电者。
但是如果我想为 Rebus 使用 Windsor 容器适配器,IHandlerActivator 接口不会公开允许注册内联消息处理程序的 .Handle 方法,在该方法中我从回复中获取联系人,然后将它们发回给 api 来电者。
有没有办法做到这一点,还是我处理问题的方式不正确?
编辑:如您在第一个示例中所见,我从 Windsor 容器注入 IBus 接口。
但是,如果我使用注入总线,我该如何告诉它监听从消息处理程序返回的回复?
更新:Rebus.Async 正是我要找的:https://github.com/rebus-org/Rebus.Async
当您使用真正的 IoC 容器时,您应该创建一个实现 IHandleMessages<TMessage>
的 class,例如像这样:
public class GetContactsRequestHandler : IHandleMessages<GetContactsRequest>
{
readonly IBus _bus;
public GetContactsRequestHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(GetContactsRequest request)
{
var reply = new GetContactsReply(...);
await _bus.Reply(reply);
}
}
当您向您的程序添加处理程序和辅助应用程序和域服务时,它的扩展性会更好。
这当然意味着你必须引入 Rebus.CastleWindsor NuGet 包并传递一个 WindsorContainerAdapter
给 Rebus 的 .With(...)
方法。
当您需要在 Windsor 中注册处理程序时,您可以利用 Rebus.CastleWindsor 中的扩展方法,它允许执行以下操作:
// register all handler types from this assembly
windsorContainer.AutoRegisterHandlersFromThisAssembly();
// register all handler types from typeof(SomeHandler).Assembly
windsorContainer.AutoRegisterHandlersFromAssemblyOf<SomeHandler>();
// register one particular type as a message handler
windsorContainer.RegisterHandler<GetContactsRequestHandler>();
在您添加更多信息后更新 – 您说:
(...) or am I approaching the problem incorrectly?
我会说 "yes",因为服务总线不太擅长同步 request/reply – 你应该为此使用一些东西 "request/reply-oriented",例如HTTP(*).
此外 – 我对南希几乎一无所知,但我很确定
bus.Publish(...).Wait();
会出现死锁,至少如果您在 ASP.NET.
接待南希的话
你应该总是
await bus.Publish(...);
当你可以的时候,在南希你可以这样做:
Get["/v1/contact"] = async (parameters, ct) =>
{
(...)
await bus.Publish(...);
(...)
};
PS:最后一件事:只要您希望端点处理消息,请记住保持容器在身边。
在您发布的代码中,您在发布事件后立即处理 BuiltinHandlerActivator
,这意味着您很可能根本无法处理任何消息。
如果你使用Windsor,当你销毁Windsor容器时,总线也会被销毁。
(*) 虽然 Rebus 实际上有 Rebus.Async,这是一个 request/response API,可以在您的场景中使用。
不过我不建议走这条路。如果你打算这样做一次,你应该只这样做,然后再也不会:)
我正在尝试使用 Rebus 执行请求/回复,但也使用 Windsor 容器 NuGet 包:https://github.com/rebus-org/Rebus.CastleWindsor
查看以下示例后:
https://github.com/rebus-org/RebusSamples/tree/master/RequestReply
https://github.com/rebus-org/RebusSamples/tree/master/Integration
我拼凑了下面这个例子:
public class ContactModule : NancyModule
{
public ContactModule(IBus bus)
{
Get["/v1/contact"] = parameters =>
{
var contacts = new List<Contact>();
using (var activator = new BuiltinHandlerActivator())
{
var settings = new Settings();
activator.Handle<GetContactsResponse>(response =>
{
contacts = response.Contacts;
return Task.FromResult(0);
});
Configure.With(activator)
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMq(
settings.RabbitMQConnectionString,
settings.OutputQueueName)) // we listen for messages in the output queue
.Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) // but we publish to the input queue
.Options(o =>
{
o.EnableCompression();
o.EnableEncryption(settings.MessageEncryptionKey);
})
.Start();
activator.Bus.Publish(new GetContactsRequest``()).Wait();
}
return Response.AsJson(contacts);
};
}
}
我知道使用 Windsor 容器方法时它可能看起来更像下面,但我不知道如何听回复:
public class ContactModule : NancyModule
{
public ContactModule(IBus bus)
{
Get["/v1/contact"] = parameters =>
{
var contacts = new List<Contact>();
bus.Publish(new GetContactsRequest()).Wait();
// How do I listen for the reply?
return Response.AsJson(contacts);
};
}
}
我的消息处理程序:
public class GetContactsHandler : IHandleMessages<GetContactsRequest>
{
private readonly IBus _bus;
private readonly Settings _settings;
public GetContactsHandler(IBus bus, Settings settings)
{
_bus = bus;
_settings = settings;
}
public async Task Handle(GetContactsRequest request)
{
// Fetch contacts from db...
var contacts = new List<Contact>();
await _bus.Reply(new GetContactsResponse {Contacts = contacts});
}
}
我的 Rebus Windsor 安装程序:
public class RebusInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
var settings = new Settings();
Configure.With(new CastleWindsorContainerAdapter(container))
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient(
settings.RabbitMQConnectionString))
.Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName))
.Options(o =>
{
o.EnableCompression();
o.EnableEncryption(settings.MessageEncryptionKey);
})
.Start();
}
}
我面临的问题是我想在我的网站 api 中使用请求/回复模式来请求联系人列表,等待包含检索到的联系人的回复和 return他们给 api 来电者。
但是如果我想为 Rebus 使用 Windsor 容器适配器,IHandlerActivator 接口不会公开允许注册内联消息处理程序的 .Handle 方法,在该方法中我从回复中获取联系人,然后将它们发回给 api 来电者。
有没有办法做到这一点,还是我处理问题的方式不正确?
编辑:如您在第一个示例中所见,我从 Windsor 容器注入 IBus 接口。 但是,如果我使用注入总线,我该如何告诉它监听从消息处理程序返回的回复?
更新:Rebus.Async 正是我要找的:https://github.com/rebus-org/Rebus.Async
当您使用真正的 IoC 容器时,您应该创建一个实现 IHandleMessages<TMessage>
的 class,例如像这样:
public class GetContactsRequestHandler : IHandleMessages<GetContactsRequest>
{
readonly IBus _bus;
public GetContactsRequestHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(GetContactsRequest request)
{
var reply = new GetContactsReply(...);
await _bus.Reply(reply);
}
}
当您向您的程序添加处理程序和辅助应用程序和域服务时,它的扩展性会更好。
这当然意味着你必须引入 Rebus.CastleWindsor NuGet 包并传递一个 WindsorContainerAdapter
给 Rebus 的 .With(...)
方法。
当您需要在 Windsor 中注册处理程序时,您可以利用 Rebus.CastleWindsor 中的扩展方法,它允许执行以下操作:
// register all handler types from this assembly
windsorContainer.AutoRegisterHandlersFromThisAssembly();
// register all handler types from typeof(SomeHandler).Assembly
windsorContainer.AutoRegisterHandlersFromAssemblyOf<SomeHandler>();
// register one particular type as a message handler
windsorContainer.RegisterHandler<GetContactsRequestHandler>();
在您添加更多信息后更新 – 您说:
(...) or am I approaching the problem incorrectly?
我会说 "yes",因为服务总线不太擅长同步 request/reply – 你应该为此使用一些东西 "request/reply-oriented",例如HTTP(*).
此外 – 我对南希几乎一无所知,但我很确定
bus.Publish(...).Wait();
会出现死锁,至少如果您在 ASP.NET.
接待南希的话你应该总是
await bus.Publish(...);
当你可以的时候,在南希你可以这样做:
Get["/v1/contact"] = async (parameters, ct) =>
{
(...)
await bus.Publish(...);
(...)
};
PS:最后一件事:只要您希望端点处理消息,请记住保持容器在身边。
在您发布的代码中,您在发布事件后立即处理 BuiltinHandlerActivator
,这意味着您很可能根本无法处理任何消息。
如果你使用Windsor,当你销毁Windsor容器时,总线也会被销毁。
(*) 虽然 Rebus 实际上有 Rebus.Async,这是一个 request/response API,可以在您的场景中使用。
不过我不建议走这条路。如果你打算这样做一次,你应该只这样做,然后再也不会:)