MassTransit 和 Service Fabric 有状态服务?
MassTransit And Service Fabric Stateful Service?
我一直在尝试制作一个网站演示,该网站使用 MassTransit 和 RabbitMQ 将 post 消息发送到 Service Fabric 上的 运行 服务作为有状态服务。
一切顺利,我的客户会 post 一条消息:
IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);
await endPoint.Send<ICompanyRequest>(new {CompanyId = id });
我的服务结构服务中的消费者定义如下:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
e.Consumer<PersonInformationConsumer>();
});
});
busControl.Start();
这确实允许我在我的 class 中使用消息并且我可以很好地处理它。当我们想要使用 IReliableDictonary 或 IReliableQueue 或任何需要从服务结构服务中的 RunAsync 函数引用 运行 上下文的内容时,问题就来了。
所以我的问题是,我如何配置(是否可能)MassTransit 在了解服务上下文本身的有状态服务结构服务中工作?
非常感谢。
麦克
更新
好的,如果我将注册例程指向我的消息使用者 class(例如):
,我在这方面取得了一些进展
ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);
然后在我的消费者 class 中,对于我的消息,我可以执行以下操作:
internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
private static StatefulServiceContext _currentContext;
#region Constructors
public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
{
_currentContext = serviceContext;
}
public PersonInformationConsumer() : base(_currentContext)
{
}
我现在可以成功调用服务消息了:
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
我现在遇到的问题是试图在 IReliableDictionary 上存储一些东西,这样做会导致 "Object reference not set to an instance of an object" 错误 :( ...任何想法将不胜感激(尽管可能要到新的一年才能阅读!)
public async Task Consume(ConsumeContext<ICompanyRequest> context)
{
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
using (ITransaction tx = StateManager.CreateTransaction())
{
try
{
var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
这是导致错误的原因....帮助! :)
您需要做更多的工作才能使 MassTransit 和有状态服务协同工作,这里有几个问题需要您自己关心。
只有有状态分区中的主服务器(n 个分区中的 n 个主服务器)才能 write/update 有状态服务,所有副本在尝试写回任何状态时都会抛出异常。所以你需要处理这个问题,从表面上看这听起来很容易,直到你考虑到由于重新平衡集群,主节点可以在集群中移动,一般服务结构应用程序的默认设置是关闭在副本上处理,只有 运行 在主服务器上工作。这一切都是由 RunAsync 方法完成的(尝试一下,运行 3 个有状态服务在 RunAsync 方法中有一些点头,然后终止 master)。
还需要考虑对数据进行分区,由于有状态服务随分区扩展,因此您需要创建一种方法将数据分发到服务总线上的单独端点,也许有一个单独的队列只监听给定的分区范围?假设您有一条 UserCreated
消息,您可以将其拆分为 country
UK
转到分区 1,US
转到分区 2 等等...
如果您只想获得一些基本的东西并 运行ning,我会把它限制在一个分区内,然后尝试将您的总线创建放在 RunAsync 中,并在请求取消后关闭总线在取消标记上。
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
// Pass in the stateful service context
e.Consumer(c => new PersonInformationConsumer(Context));
});
});
busControl.Start();
while (true)
{
if(cancellationToken.CancellationRequested)
{
//Service Fabric wants us to stop
busControl.Stop();
cancellationToken.ThrowIfCancellationRequested();
}
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
我一直在尝试制作一个网站演示,该网站使用 MassTransit 和 RabbitMQ 将 post 消息发送到 Service Fabric 上的 运行 服务作为有状态服务。
一切顺利,我的客户会 post 一条消息:
IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);
await endPoint.Send<ICompanyRequest>(new {CompanyId = id });
我的服务结构服务中的消费者定义如下:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
e.Consumer<PersonInformationConsumer>();
});
});
busControl.Start();
这确实允许我在我的 class 中使用消息并且我可以很好地处理它。当我们想要使用 IReliableDictonary 或 IReliableQueue 或任何需要从服务结构服务中的 RunAsync 函数引用 运行 上下文的内容时,问题就来了。
所以我的问题是,我如何配置(是否可能)MassTransit 在了解服务上下文本身的有状态服务结构服务中工作?
非常感谢。 麦克
更新 好的,如果我将注册例程指向我的消息使用者 class(例如):
,我在这方面取得了一些进展ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);
然后在我的消费者 class 中,对于我的消息,我可以执行以下操作:
internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
private static StatefulServiceContext _currentContext;
#region Constructors
public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
{
_currentContext = serviceContext;
}
public PersonInformationConsumer() : base(_currentContext)
{
}
我现在可以成功调用服务消息了:
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
我现在遇到的问题是试图在 IReliableDictionary 上存储一些东西,这样做会导致 "Object reference not set to an instance of an object" 错误 :( ...任何想法将不胜感激(尽管可能要到新的一年才能阅读!)
public async Task Consume(ConsumeContext<ICompanyRequest> context)
{
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
using (ITransaction tx = StateManager.CreateTransaction())
{
try
{
var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
这是导致错误的原因....帮助! :)
您需要做更多的工作才能使 MassTransit 和有状态服务协同工作,这里有几个问题需要您自己关心。
只有有状态分区中的主服务器(n 个分区中的 n 个主服务器)才能 write/update 有状态服务,所有副本在尝试写回任何状态时都会抛出异常。所以你需要处理这个问题,从表面上看这听起来很容易,直到你考虑到由于重新平衡集群,主节点可以在集群中移动,一般服务结构应用程序的默认设置是关闭在副本上处理,只有 运行 在主服务器上工作。这一切都是由 RunAsync 方法完成的(尝试一下,运行 3 个有状态服务在 RunAsync 方法中有一些点头,然后终止 master)。
还需要考虑对数据进行分区,由于有状态服务随分区扩展,因此您需要创建一种方法将数据分发到服务总线上的单独端点,也许有一个单独的队列只监听给定的分区范围?假设您有一条 UserCreated
消息,您可以将其拆分为 country
UK
转到分区 1,US
转到分区 2 等等...
如果您只想获得一些基本的东西并 运行ning,我会把它限制在一个分区内,然后尝试将您的总线创建放在 RunAsync 中,并在请求取消后关闭总线在取消标记上。
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
// Pass in the stateful service context
e.Consumer(c => new PersonInformationConsumer(Context));
});
});
busControl.Start();
while (true)
{
if(cancellationToken.CancellationRequested)
{
//Service Fabric wants us to stop
busControl.Stop();
cancellationToken.ThrowIfCancellationRequested();
}
await Task.Delay(TimeSpan.FromSeconds(1));
}
}