使用 Rx .Net 在一些不活动后执行方法
Execute a method after some inactivity using Rx .Net
我有一个这样的控制器操作:
[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
await _mediator.Send(command);
}
在.Net Core中完成,使用MediatR处理命令。
现在,UpdateDataCommand 有一个整数 StationId 属性 来标识站号。
当客户端应用程序通过执行 Post 调用此方法时,它会更新数据库中的数据。
我想使用 Rx .Net 做的是在 Await _mediator.Send(command) 之后以某种方式启动计时器。计时器将设置为 1 分钟。 1 分钟后,我想调用另一个方法来设置数据库中的标志,但仅限于此 StationId。如果有人使用相同的 StationId 执行 Post,则计时器应自行重置。
伪代码如下所示:
[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
int stationId = command.StationId;
// let's assume stationId==2
//saves data for stationId==2
await _mediator.Send(command);
//Start a timer of 1 min
//if timer fires (meaning the 1 minute has passed) call Method2();
//if client does another "Post" for stationId==2 in the meantime
(let's say that the client does another "Post" for stationId==2 after 20 sec)
then reset the timer
}
如何使用 Reactive Extensions in.Net?
更新(@Enigmativity):
它仍然不起作用,我将计时器设置为 10 秒,如果您查看输出时间,您会看到我在 09:17:49 上创建了一个 Post(它启动了 10 秒的计时器) ,然后我在 09:17:55 处创建了一个新的 Post (它启动了另一个计时器,但它应该只重置了旧计时器)并且在第一次调用后 10 秒启动了计时器,并且第二次通话后又过了 10 秒:
我无法对此进行测试,但我认为这非常接近:
private Subject<UpdateDataCommand> posted = new Subject<UpdateDataCommand>();
private void PostInitialize()
{
posted
.GroupBy(x => x.StationId)
.Select(gxs =>
gxs
.Select(x =>
Observable
.Timer(TimeSpan.FromMinutes(1.0))
.Select(_ => x))
.Switch())
.Merge()
.Subscribe(stationId =>
{
/* update database */
});
}
public async Task Post(UpdateDataCommand command)
{
int stationId = command.StationId;
await _mediator.Send(command);
posted.OnNext(command);
}
让我知道这是否接近。
您必须先调用 PostInitialize
进行设置,然后才能开始发布更新数据命令。
这是一个表明这有效的测试:
var rnd = new Random();
var posted =
Observable
.Generate(
0, x => x < 20, x => x + 1, x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble()));
posted
.GroupBy(x => x % 3)
.Select(gxs =>
gxs
.Select(x =>
Observable
.Timer(TimeSpan.FromSeconds(1.9))
.Select(_ => x))
.Switch())
.Merge()
.Subscribe(x => Console.WriteLine(x));
我得到的结果如下:
3
4
14
15
17
18
19
因为我使用了 .GroupBy(x => x % 3)
这将始终输出 17
、18
、& 19
- 但如果随机间隔足够大,则会输出较早的数字.
要使用 Rx.Net
启动计时器,我们可以调用:
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
.Subscribe(
value =>{ /* ... */ }
);
要取消这个订阅,我们只需要稍后处理这个订阅:
subscription.Dispose();
问题是如何保持订阅。一种方法是创建一个 SubscriptionManager
服务(单例),因此我们可以调用这样的服务来安排任务,然后稍后在控制器操作中取消它,如下所示:
// you controller class
private readonly ILogger<HomeController> _logger; // injected by DI
private readonly SubscriptionManager _subscriptionMgr; // injected by DI
public async Task Post(...)
{
...
// saves data for #stationId
// Start a timer of 1 min
this._subscriptionMgr.ScheduleForStationId(stationId); // schedule a task that for #stationId that will be executed in 60s
}
[HttpPost("/Command2")]
public async Task Command2(...)
{
int stationId = command.StationId;
if( shouldCancel ){
this._subscriptionMgr.CancelForStationId(stationId); // cancel previous task for #stationId
}
}
如果您想在内存中管理订阅,我们可以使用ConcurrentDictionary
来存储订阅:
public class SubscriptionManager : IDisposable
{
private ConcurrentDictionary<string,IDisposable> _dict;
private readonly IServiceProvider _sp;
private readonly ILogger<SubscriptionManager> _logger;
public SubscriptionManager(IServiceProvider sp, ILogger<SubscriptionManager> logger)
{
this._dict= new ConcurrentDictionary<string,IDisposable>();
this._sp = sp;
this._logger = logger;
}
public IDisposable ScheduleForStationId(int stationId)
{
var timeout = 60;
this._logger.LogWarning($"Task for Station#{stationId} will be exexuted in {timeout}s") ;
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
.Subscribe(
value =>{
// if you need update the db, create a new scope:
using(var scope = this._sp.CreateScope()){
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
.FirstOrDefault();
station.Note = "updated";
dbContext.SaveChanges();
}
this._logger.LogWarning($"Task for Station#{stationId} has been executed") ;
},
e =>{
Console.WriteLine("Error!"+ e.Message);
}
);
this._dict.AddOrUpdate( stationId.ToString(), subscription , (id , sub)=> {
sub.Dispose(); // dispose the old one
return subscription;
});
return subscription;
}
public void CancelForStationId(int stationId)
{
IDisposable subscription = null;
this._dict.TryGetValue(stationId.ToString(), out subscription);
this._logger.LogWarning($"Task for station#{stationId} has been canceled");
subscription?.Dispose();
// ... if you want to update the db , create a new scope
using(var scope = this._sp.CreateScope()){
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
.FirstOrDefault();
station.Note = "canceled";
dbContext.SaveChanges();
this._logger.LogWarning("The db has been changed");
}
}
public void Dispose()
{
foreach(var entry in this._dict){
entry.Value.Dispose();
}
}
}
另一种方法是为任务管理器创建一个平面记录(如cron
),但它根本不会使用Rx.NET。
我有一个这样的控制器操作:
[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
await _mediator.Send(command);
}
在.Net Core中完成,使用MediatR处理命令。
现在,UpdateDataCommand 有一个整数 StationId 属性 来标识站号。 当客户端应用程序通过执行 Post 调用此方法时,它会更新数据库中的数据。 我想使用 Rx .Net 做的是在 Await _mediator.Send(command) 之后以某种方式启动计时器。计时器将设置为 1 分钟。 1 分钟后,我想调用另一个方法来设置数据库中的标志,但仅限于此 StationId。如果有人使用相同的 StationId 执行 Post,则计时器应自行重置。
伪代码如下所示:
[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
int stationId = command.StationId;
// let's assume stationId==2
//saves data for stationId==2
await _mediator.Send(command);
//Start a timer of 1 min
//if timer fires (meaning the 1 minute has passed) call Method2();
//if client does another "Post" for stationId==2 in the meantime
(let's say that the client does another "Post" for stationId==2 after 20 sec)
then reset the timer
}
如何使用 Reactive Extensions in.Net?
更新(@Enigmativity):
它仍然不起作用,我将计时器设置为 10 秒,如果您查看输出时间,您会看到我在 09:17:49 上创建了一个 Post(它启动了 10 秒的计时器) ,然后我在 09:17:55 处创建了一个新的 Post (它启动了另一个计时器,但它应该只重置了旧计时器)并且在第一次调用后 10 秒启动了计时器,并且第二次通话后又过了 10 秒:
我无法对此进行测试,但我认为这非常接近:
private Subject<UpdateDataCommand> posted = new Subject<UpdateDataCommand>();
private void PostInitialize()
{
posted
.GroupBy(x => x.StationId)
.Select(gxs =>
gxs
.Select(x =>
Observable
.Timer(TimeSpan.FromMinutes(1.0))
.Select(_ => x))
.Switch())
.Merge()
.Subscribe(stationId =>
{
/* update database */
});
}
public async Task Post(UpdateDataCommand command)
{
int stationId = command.StationId;
await _mediator.Send(command);
posted.OnNext(command);
}
让我知道这是否接近。
您必须先调用 PostInitialize
进行设置,然后才能开始发布更新数据命令。
这是一个表明这有效的测试:
var rnd = new Random();
var posted =
Observable
.Generate(
0, x => x < 20, x => x + 1, x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble()));
posted
.GroupBy(x => x % 3)
.Select(gxs =>
gxs
.Select(x =>
Observable
.Timer(TimeSpan.FromSeconds(1.9))
.Select(_ => x))
.Switch())
.Merge()
.Subscribe(x => Console.WriteLine(x));
我得到的结果如下:
3 4 14 15 17 18 19
因为我使用了 .GroupBy(x => x % 3)
这将始终输出 17
、18
、& 19
- 但如果随机间隔足够大,则会输出较早的数字.
要使用 Rx.Net
启动计时器,我们可以调用:
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
.Subscribe(
value =>{ /* ... */ }
);
要取消这个订阅,我们只需要稍后处理这个订阅:
subscription.Dispose();
问题是如何保持订阅。一种方法是创建一个 SubscriptionManager
服务(单例),因此我们可以调用这样的服务来安排任务,然后稍后在控制器操作中取消它,如下所示:
// you controller class
private readonly ILogger<HomeController> _logger; // injected by DI
private readonly SubscriptionManager _subscriptionMgr; // injected by DI
public async Task Post(...)
{
...
// saves data for #stationId
// Start a timer of 1 min
this._subscriptionMgr.ScheduleForStationId(stationId); // schedule a task that for #stationId that will be executed in 60s
}
[HttpPost("/Command2")]
public async Task Command2(...)
{
int stationId = command.StationId;
if( shouldCancel ){
this._subscriptionMgr.CancelForStationId(stationId); // cancel previous task for #stationId
}
}
如果您想在内存中管理订阅,我们可以使用ConcurrentDictionary
来存储订阅:
public class SubscriptionManager : IDisposable
{
private ConcurrentDictionary<string,IDisposable> _dict;
private readonly IServiceProvider _sp;
private readonly ILogger<SubscriptionManager> _logger;
public SubscriptionManager(IServiceProvider sp, ILogger<SubscriptionManager> logger)
{
this._dict= new ConcurrentDictionary<string,IDisposable>();
this._sp = sp;
this._logger = logger;
}
public IDisposable ScheduleForStationId(int stationId)
{
var timeout = 60;
this._logger.LogWarning($"Task for Station#{stationId} will be exexuted in {timeout}s") ;
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
.Subscribe(
value =>{
// if you need update the db, create a new scope:
using(var scope = this._sp.CreateScope()){
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
.FirstOrDefault();
station.Note = "updated";
dbContext.SaveChanges();
}
this._logger.LogWarning($"Task for Station#{stationId} has been executed") ;
},
e =>{
Console.WriteLine("Error!"+ e.Message);
}
);
this._dict.AddOrUpdate( stationId.ToString(), subscription , (id , sub)=> {
sub.Dispose(); // dispose the old one
return subscription;
});
return subscription;
}
public void CancelForStationId(int stationId)
{
IDisposable subscription = null;
this._dict.TryGetValue(stationId.ToString(), out subscription);
this._logger.LogWarning($"Task for station#{stationId} has been canceled");
subscription?.Dispose();
// ... if you want to update the db , create a new scope
using(var scope = this._sp.CreateScope()){
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
.FirstOrDefault();
station.Note = "canceled";
dbContext.SaveChanges();
this._logger.LogWarning("The db has been changed");
}
}
public void Dispose()
{
foreach(var entry in this._dict){
entry.Value.Dispose();
}
}
}
另一种方法是为任务管理器创建一个平面记录(如cron
),但它根本不会使用Rx.NET。