Service Fabric、Akka.net 和 Persistent actor 集成
Service Fabric, Akka.net and Persistent actor integration
我有一些来自工作示例的代码片段。此示例是在 service fabric 中进行 REST 调用 (WebAPI) 和轮询器以轮询请求。有五个actors (1) FileImportValidator 验证文件名 (2) FileParser 解析文件 (3) AgeValidator 验证年龄 (4) FilePersister 将姓名和年龄保留为事件。
请分享此设计是否符合使用 AKKA.NET 为事件源系统进行角色建模的预期。
PS。要解析的文件已经上传。 REST 调用仅提供文件名。我有意删除了一些验证逻辑。
//WebAPI:
[HttpPost]
[Route("import")]
public async Task<IHttpActionResult> Import(FileImportRequest request)
{
IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
using (ITransaction tx = stateManager.CreateTransaction())
{
await queue.EnqueueAsync(tx, request.FileName);
await tx.CommitAsync();
}
return Ok();
}
// Poller in Microsoft Service Fabric MicroService:
public class FileImportMicroService : StatefulService
{
public FileImportMicroService()
{
domainActorSystem = ActorSystem.Create("DomainActorSystem");
fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
}
protected override ICommunicationListener CreateCommunicationListener()
{
ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);
return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (!cancellationToken.IsCancellationRequested)
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
}
ServiceEventSource.Current.Message(dequeuReply.Value);
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
ActorSystem domainActorSystem;
IActorRef fileImportValidator;
}
//FileImportValidator Actor
public class FileImportValidator : UntypedActor
{
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidateFileCommand command)
{
_fileParser = Context.ActorOf(Props.Create(() => new FileParser()));
...
_fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
}
private IActorRef _fileParser;
}
//FileParser Actor:
public class FileParser : UntypedActor
{
private IActorRef _ageValidator;
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidationSuccessfulEvent message)
{
var lines = File.ReadLines(message.FileName);
foreach(var line in lines)
{
var cols = line.Split(',');
var File = new { Name = cols[0], Age = cols[1] };
_ageValidator.Tell(new ValidateAge(File.Name, File.Age));
}
}
protected override void PreStart()
{
_ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));
base.PreStart();
}
}
//AgeValidator Actor:
public class AgeValidator : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is ValidateAge)
{
_filePersistor.Tell(new SaveNameAndAge(message));
}
}
protected override void PreStart()
{
_filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");
base.PreStart();
}
private IActorRef _filePersistor;
}
//Persistent Actor:
public class FilePersistor : PersistentActor
{
...
protected override bool ReceiveCommand(object message)
{
Persist(/* Handler to persist name and age */);
return true;
}
...
}
您可以考虑的另一种方法是在服务中使用 ReliableDictionary 'persist' 系统状态(已处理的文件)。上传新文件时,您将创建一个新演员并传递一个 FileId,以便演员可以检索数据并进行处理。完成后,它会调用服务,以便可以从列表中删除该项目。通过这种方式,您可以并行处理文件。
我有一些来自工作示例的代码片段。此示例是在 service fabric 中进行 REST 调用 (WebAPI) 和轮询器以轮询请求。有五个actors (1) FileImportValidator 验证文件名 (2) FileParser 解析文件 (3) AgeValidator 验证年龄 (4) FilePersister 将姓名和年龄保留为事件。
请分享此设计是否符合使用 AKKA.NET 为事件源系统进行角色建模的预期。
PS。要解析的文件已经上传。 REST 调用仅提供文件名。我有意删除了一些验证逻辑。
//WebAPI:
[HttpPost]
[Route("import")]
public async Task<IHttpActionResult> Import(FileImportRequest request)
{
IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
using (ITransaction tx = stateManager.CreateTransaction())
{
await queue.EnqueueAsync(tx, request.FileName);
await tx.CommitAsync();
}
return Ok();
}
// Poller in Microsoft Service Fabric MicroService:
public class FileImportMicroService : StatefulService
{
public FileImportMicroService()
{
domainActorSystem = ActorSystem.Create("DomainActorSystem");
fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
}
protected override ICommunicationListener CreateCommunicationListener()
{
ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);
return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (!cancellationToken.IsCancellationRequested)
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
}
ServiceEventSource.Current.Message(dequeuReply.Value);
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
ActorSystem domainActorSystem;
IActorRef fileImportValidator;
}
//FileImportValidator Actor
public class FileImportValidator : UntypedActor
{
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidateFileCommand command)
{
_fileParser = Context.ActorOf(Props.Create(() => new FileParser()));
...
_fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
}
private IActorRef _fileParser;
}
//FileParser Actor:
public class FileParser : UntypedActor
{
private IActorRef _ageValidator;
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidationSuccessfulEvent message)
{
var lines = File.ReadLines(message.FileName);
foreach(var line in lines)
{
var cols = line.Split(',');
var File = new { Name = cols[0], Age = cols[1] };
_ageValidator.Tell(new ValidateAge(File.Name, File.Age));
}
}
protected override void PreStart()
{
_ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));
base.PreStart();
}
}
//AgeValidator Actor:
public class AgeValidator : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is ValidateAge)
{
_filePersistor.Tell(new SaveNameAndAge(message));
}
}
protected override void PreStart()
{
_filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");
base.PreStart();
}
private IActorRef _filePersistor;
}
//Persistent Actor:
public class FilePersistor : PersistentActor
{
...
protected override bool ReceiveCommand(object message)
{
Persist(/* Handler to persist name and age */);
return true;
}
...
}
您可以考虑的另一种方法是在服务中使用 ReliableDictionary 'persist' 系统状态(已处理的文件)。上传新文件时,您将创建一个新演员并传递一个 FileId,以便演员可以检索数据并进行处理。完成后,它会调用服务,以便可以从列表中删除该项目。通过这种方式,您可以并行处理文件。