对具有约束和暂停支持的每个 DynamicData SourceCache 项目进行操作
Act on each DynamicData SourceCache items with constraint and pause support
我在这里要做的是迭代 SourceCache
的每个项目,每个 运行 之间有一点时间,并对每个项目做一些异步工作。每个项目必须一个接一个地处理,工作不能重叠。我在每个 运行 之后加了一点限制。源缓存数据随时都可能变化,并不是像例子中那样固定的数据。这个例子运行良好,但我需要一种使用 IObservable<bool>
暂停处理的方法,它在可以处理更多工作时发送 true,否则发送 false。
我不知道这是否是解决此问题的最佳方法,但也许还有另一种方法可以做到这一点。如果可能,尽量坚持反应式代码。
附带说明一下,我使用的是 Net Framework 4.6.2 和 DynamicData 7.1.1,最重要的是我真的是编程新手(只是为了好玩)和 Rx.Net.
var data = Observable.Range(0, 20).Select(value => new Work(value));
ISourceCache<Work, int> source = new SourceCache<Work, int>(x => x.Id);
using var populate = source.PopulateFrom(data);
var observ = source.Connect().AsObservableCache();
using var _ = observ.Connect()
.AutoRefresh(x => x.Status)
.Filter(x => x.Status == Status.Queue)
.ToCollection()
.Select(collection => collection.ToObservable())
.Switch()
.Take(1)
// How to pause just before status change
.Do(work => work.Status = Status.Running)
.Select(x => Run(x).ToObservable())
.Concat()
.Concat(Observable.Empty<Work>().Delay(TimeSpan.FromMilliseconds(250)))
.Repeat()
.Subscribe();
using var ignore = toggle.Connect();
private static async Task<Work> Run(Work work)
{
// This is fixed but the time needed to run each work is not constant
await Task.Delay(TimeSpan.FromSeconds(1));
return work;
}
public enum Status
{
Queue,
Running
}
public class Work : AbstractNotifyPropertyChanged
{
private Status _status;
public Work(int id)
{
Id = id;
Status = Status.Queue;
}
public int Id { get; }
public Status Status
{
get => _status;
set => SetAndRaise(ref _status, value);
}
public override string ToString()
{
return $"Id: {Id:000} - Status: {Status}";
}
}
经过一些脑筋急转弯的会议后,我得出了一些可以解决我的问题的方法。
var toggle = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5))
.SwitchState(false)
.Do(Console.WriteLine)
.Replay(1);
var firstWork = observ.Connect()
.AutoRefresh(x => x.Status)
.Filter(x => x.Status == Status.Queue)
.ToCollection()
.Select(collection => collection.FirstOrOptional(x => true))
.StartWith(Optional<Work>.None);
var getWork = Observable.Timer(TimeSpan.FromMilliseconds(250))
.WithLatestFrom(firstWork, (_, first) => first);
var selector = toggle.Select(value => value ?
getWork :
Observable.Empty<Optional<Work>>())
.Switch()
.Take(1)
.Where(optional => optional.HasValue)
.Select(optional => optional.Value)
.Do(Console.WriteLine)
.Do(work => work.Status = Status.Running)
.Select(x => Run(x).ToObservable())
.Concat()
.Do(Console.WriteLine)
.Repeat();
using var executing = selector.Subscribe();
using var pauseChange = toggle.Connect();
toggle
每 5 秒模拟一次暂停和取消暂停,但在我的例子中是切换按钮的状态。
暂停时 selector
return 一个空的可观察对象,没有真正的工作要做。
当取消暂停时,流在初始到期时间后从 collection 获得一个且只有一个 Work
。然后启动后台作业直到完成,然后一切重新开始。
在我的真实案例中,我从一个空的 collection 开始,这就是为什么我必须对 DynamicData 的 Optional<T>
实现施展魔法。
我认为这解决了我的问题,但我对另一种解决方案持开放态度。
也许您可以使用 BatchIf 暂停更新来稍微简化代码。
/// <summary>
/// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received.
/// When a resume signal has been received the batched updates will be fired.
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <param name="source">The source.</param>
/// <param name="pauseIfTrueSelector">When true, observable begins to buffer and when false, window closes and buffered result if notified.</param>
/// <param name="scheduler">The scheduler.</param>
/// <returns>An observable which emits change sets.</returns>
/// <exception cref="System.ArgumentNullException">source.</exception>
public static IObservable<IChangeSet<TObject, TKey>> BatchIf<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, IObservable<bool> pauseIfTrueSelector, IScheduler? scheduler = null)
where TKey : notnull
{
return BatchIf(source, pauseIfTrueSelector, false, scheduler);
}
我在这里要做的是迭代 SourceCache
的每个项目,每个 运行 之间有一点时间,并对每个项目做一些异步工作。每个项目必须一个接一个地处理,工作不能重叠。我在每个 运行 之后加了一点限制。源缓存数据随时都可能变化,并不是像例子中那样固定的数据。这个例子运行良好,但我需要一种使用 IObservable<bool>
暂停处理的方法,它在可以处理更多工作时发送 true,否则发送 false。
我不知道这是否是解决此问题的最佳方法,但也许还有另一种方法可以做到这一点。如果可能,尽量坚持反应式代码。
附带说明一下,我使用的是 Net Framework 4.6.2 和 DynamicData 7.1.1,最重要的是我真的是编程新手(只是为了好玩)和 Rx.Net.
var data = Observable.Range(0, 20).Select(value => new Work(value));
ISourceCache<Work, int> source = new SourceCache<Work, int>(x => x.Id);
using var populate = source.PopulateFrom(data);
var observ = source.Connect().AsObservableCache();
using var _ = observ.Connect()
.AutoRefresh(x => x.Status)
.Filter(x => x.Status == Status.Queue)
.ToCollection()
.Select(collection => collection.ToObservable())
.Switch()
.Take(1)
// How to pause just before status change
.Do(work => work.Status = Status.Running)
.Select(x => Run(x).ToObservable())
.Concat()
.Concat(Observable.Empty<Work>().Delay(TimeSpan.FromMilliseconds(250)))
.Repeat()
.Subscribe();
using var ignore = toggle.Connect();
private static async Task<Work> Run(Work work)
{
// This is fixed but the time needed to run each work is not constant
await Task.Delay(TimeSpan.FromSeconds(1));
return work;
}
public enum Status
{
Queue,
Running
}
public class Work : AbstractNotifyPropertyChanged
{
private Status _status;
public Work(int id)
{
Id = id;
Status = Status.Queue;
}
public int Id { get; }
public Status Status
{
get => _status;
set => SetAndRaise(ref _status, value);
}
public override string ToString()
{
return $"Id: {Id:000} - Status: {Status}";
}
}
经过一些脑筋急转弯的会议后,我得出了一些可以解决我的问题的方法。
var toggle = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5))
.SwitchState(false)
.Do(Console.WriteLine)
.Replay(1);
var firstWork = observ.Connect()
.AutoRefresh(x => x.Status)
.Filter(x => x.Status == Status.Queue)
.ToCollection()
.Select(collection => collection.FirstOrOptional(x => true))
.StartWith(Optional<Work>.None);
var getWork = Observable.Timer(TimeSpan.FromMilliseconds(250))
.WithLatestFrom(firstWork, (_, first) => first);
var selector = toggle.Select(value => value ?
getWork :
Observable.Empty<Optional<Work>>())
.Switch()
.Take(1)
.Where(optional => optional.HasValue)
.Select(optional => optional.Value)
.Do(Console.WriteLine)
.Do(work => work.Status = Status.Running)
.Select(x => Run(x).ToObservable())
.Concat()
.Do(Console.WriteLine)
.Repeat();
using var executing = selector.Subscribe();
using var pauseChange = toggle.Connect();
toggle
每 5 秒模拟一次暂停和取消暂停,但在我的例子中是切换按钮的状态。
暂停时 selector
return 一个空的可观察对象,没有真正的工作要做。
当取消暂停时,流在初始到期时间后从 collection 获得一个且只有一个 Work
。然后启动后台作业直到完成,然后一切重新开始。
在我的真实案例中,我从一个空的 collection 开始,这就是为什么我必须对 DynamicData 的 Optional<T>
实现施展魔法。
我认为这解决了我的问题,但我对另一种解决方案持开放态度。
也许您可以使用 BatchIf 暂停更新来稍微简化代码。
/// <summary>
/// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received.
/// When a resume signal has been received the batched updates will be fired.
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <param name="source">The source.</param>
/// <param name="pauseIfTrueSelector">When true, observable begins to buffer and when false, window closes and buffered result if notified.</param>
/// <param name="scheduler">The scheduler.</param>
/// <returns>An observable which emits change sets.</returns>
/// <exception cref="System.ArgumentNullException">source.</exception>
public static IObservable<IChangeSet<TObject, TKey>> BatchIf<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, IObservable<bool> pauseIfTrueSelector, IScheduler? scheduler = null)
where TKey : notnull
{
return BatchIf(source, pauseIfTrueSelector, false, scheduler);
}