如何避免 Service Fabric 托管服务中的重复后台任务处理?
How can I avoid duplicate background task processing in Service Fabric hosted services?
抱歉标题模糊,很难解释。我有以下设置:
- 我正在 运行在 Service Fabric 中托管 .NET Core 2.2 Web API。
- 此 API 的部分职责是监视外部 FTP 存储以获取新传入文件。
- 每个文件都会触发一个 Mediator
Command
来调用处理逻辑。
- 我已经实施了一个基于 https://docs.microsoft.com/en-us/dotnet/architecture/microservices/multi-container-microservice-net-applications/background-tasks-with-ihostedservice and https://blog.maartenballiauw.be/post/2017/08/01/building-a-scheduled-cache-updater-in-aspnet-core-2.html 的混合解决方案。从本质上讲,这是一个
IHostedService
实现,在 API 的 Startup.cs
中注册。它基本上是一个后台服务 运行ning in-process.
至于问题。上面的解决方案在 1 节点集群上运行良好,但在 5 节点集群上 运行ning 时会导致处理“重复项”。问题在于,在一个 5 节点集群上,当然有 5 个相同的 ScheduledTasks
运行ning 并且都将访问 same 文件 FTP 在 相同的 时间。
我意识到这在某种程度上是由关注点分离不当引起的 - 也就是说 API 不应该对此负责,而应该由一个完全独立的进程来处理。
这让我想到了 Service fabric 支持的不同服务(有状态、无状态、Actor 和 Hosted Guest Exe)。 Actor
似乎是唯一一个 运行s single-threaded,即使在 5 节点集群上也是如此。此外,Actor
似乎不太适合这种情况,因为它需要被触发。就我而言,我基本上需要一个 运行 始终按计划运行的守护进程。如果我没记错的话,其他 stateful/stateless 服务也会 运行 有 5 个“克隆”,并且会导致与我目前遇到的问题相同的问题。
我想我的问题是:如何使用 Service Fabric 进行高效的后台处理并避免这些 multi-threaded/duplicate 问题?在此先感谢您的任何意见。
在 service farbic 中,你有 2 个 actors 选项:
您可以使用状态来确定演员是否已处理您的 ftp 文件。
看看 this blog post,看看他们如何使用每 30 秒提醒 运行。
重要的是您的 actor 中的代码允许 reantrancy。
基本上因为演员是可靠的,你的代码可能会被执行多次并在执行过程中被取消。
而不是这样做:
public void Method()
{
_ftpService.Process(file);
}
考虑这样做:
public void Method(int fileId)
{
if (_ftpService.IsNotProcessed(fileId))
{
_ftpService.Process(file);
_ftpService.SetProcessed(fileId);
}
}
如果您的 actor 在处理时遇到问题,您可能需要检查您是否在代码中处理取消令牌。我从来没有遇到过这个问题,但我们正在使用 autofac,Autofac.ServiceFabric to register our actors with RegisterActor<T>()
and we have cancelationtokens in most of our logic. Also the documentation of CancellationTokenSource 可以帮助你。
例子
public Ctor()
{
_cancelationTokenSource = new CancellationTokenSource();
_cancellationToken= _cancelationTokenSource.Token;
}
public async Task SomeMethod()
{
while(/*condition*/)
{
_cancellationToken.ThrowIfCancellationRequested();
/*Other code*/
}
}
protected override async Task OnDeactivateAsync()
{
_cancelationTokenSource.Cancel();
}
抱歉标题模糊,很难解释。我有以下设置:
- 我正在 运行在 Service Fabric 中托管 .NET Core 2.2 Web API。
- 此 API 的部分职责是监视外部 FTP 存储以获取新传入文件。
- 每个文件都会触发一个 Mediator
Command
来调用处理逻辑。 - 我已经实施了一个基于 https://docs.microsoft.com/en-us/dotnet/architecture/microservices/multi-container-microservice-net-applications/background-tasks-with-ihostedservice and https://blog.maartenballiauw.be/post/2017/08/01/building-a-scheduled-cache-updater-in-aspnet-core-2.html 的混合解决方案。从本质上讲,这是一个
IHostedService
实现,在 API 的Startup.cs
中注册。它基本上是一个后台服务 运行ning in-process.
至于问题。上面的解决方案在 1 节点集群上运行良好,但在 5 节点集群上 运行ning 时会导致处理“重复项”。问题在于,在一个 5 节点集群上,当然有 5 个相同的 ScheduledTasks
运行ning 并且都将访问 same 文件 FTP 在 相同的 时间。
我意识到这在某种程度上是由关注点分离不当引起的 - 也就是说 API 不应该对此负责,而应该由一个完全独立的进程来处理。
这让我想到了 Service fabric 支持的不同服务(有状态、无状态、Actor 和 Hosted Guest Exe)。 Actor
似乎是唯一一个 运行s single-threaded,即使在 5 节点集群上也是如此。此外,Actor
似乎不太适合这种情况,因为它需要被触发。就我而言,我基本上需要一个 运行 始终按计划运行的守护进程。如果我没记错的话,其他 stateful/stateless 服务也会 运行 有 5 个“克隆”,并且会导致与我目前遇到的问题相同的问题。
我想我的问题是:如何使用 Service Fabric 进行高效的后台处理并避免这些 multi-threaded/duplicate 问题?在此先感谢您的任何意见。
在 service farbic 中,你有 2 个 actors 选项:
您可以使用状态来确定演员是否已处理您的 ftp 文件。
看看 this blog post,看看他们如何使用每 30 秒提醒 运行。
重要的是您的 actor 中的代码允许 reantrancy。 基本上因为演员是可靠的,你的代码可能会被执行多次并在执行过程中被取消。
而不是这样做:
public void Method()
{
_ftpService.Process(file);
}
考虑这样做:
public void Method(int fileId)
{
if (_ftpService.IsNotProcessed(fileId))
{
_ftpService.Process(file);
_ftpService.SetProcessed(fileId);
}
}
如果您的 actor 在处理时遇到问题,您可能需要检查您是否在代码中处理取消令牌。我从来没有遇到过这个问题,但我们正在使用 autofac,Autofac.ServiceFabric to register our actors with RegisterActor<T>()
and we have cancelationtokens in most of our logic. Also the documentation of CancellationTokenSource 可以帮助你。
例子
public Ctor()
{
_cancelationTokenSource = new CancellationTokenSource();
_cancellationToken= _cancelationTokenSource.Token;
}
public async Task SomeMethod()
{
while(/*condition*/)
{
_cancellationToken.ThrowIfCancellationRequested();
/*Other code*/
}
}
protected override async Task OnDeactivateAsync()
{
_cancelationTokenSource.Cancel();
}