从定期异步请求创建可观察对象
Create observable from periodic async request
我想要一种将异步方法转换为可观察对象的通用方法。就我而言,我正在处理使用 HttpClient
从 API.
中获取数据的方法
假设我们有方法 Task<string> GetSomeData()
需要变成单个 Observable<string>
,其中值是作为以下组合生成的:
- 重复定期调用
GetSomeData()
(例如每 x 秒)
- 在任何给定时间手动触发对
GetSomeData()
的调用(例如,当用户点击刷新时)。
因为有两种方法可以触发 GetSomeData()
并发性的执行可能是一个问题。为了避免要求 GetSomeData()
是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要用一些策略来处理重叠的请求。我做了一个(某种)大理石图来描述问题和想要的结果
我的直觉告诉我有一个简单的方法可以做到这一点,所以请给我一些见解:)
这是我目前得到的解决方案。不幸的是,它没有解决并发问题。
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall()
{
_manualCallsSubject.OnNext(Unit.Default);
}
}
延迟重复的扩展方法:
static class Extensions
{
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
}
包含生成可观察对象方法的服务示例
class SomeService
{
private int _ticks = 0;
public async Task<string> GetSomeValueAsync()
{
//Just a hack to dermine if request was triggered manuall or by timer
var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";
//Here we have a data race! We would like to limit access to this method
var valueToReturn = $"{_ticks} ({initiatationWay})";
await Task.Delay(500);
_ticks += 1;
return valueToReturn;
}
}
这样使用(会发生数据竞争):
static async Task Main(string[] args)
{
//Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
var someService = new SomeService();
var stopwatch = Stopwatch.StartNew();
var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
observableWrapper.Stream
.Take(6)
.Subscribe(x =>
{
Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
});
await Task.Delay(4000);
observableWrapper.TriggerAdditionalCall();
observableWrapper.TriggerAdditionalCall();
Console.ReadLine();
}
这是我对这个问题的看法:
更新: 通过借鉴 Enigmativity 的 . The Observable.StartAsync
method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim
.
,我能够大大简化我建议的解决方案
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
Func<bool, CancellationToken, Task<T>> functionAsync,
TimeSpan period,
out Action manualInvocation)
{
// Arguments validation omitted
var manualSubject = new Subject<bool>();
manualInvocation = () => manualSubject.OnNext(true);
return Observable.Defer(() =>
{
var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
return Observable
.Interval(period)
.Select(_ => false) // Not manual
.Merge(manualSubject)
.TakeUntil(isManual => isManual) // Stop on first manual
.Repeat() // ... and restart the timer
.Prepend(false) // Skip the initial interval delay
.Select(isManual =>
{
if (isManual)
{
// Triggered manually
return Observable.StartAsync(async ct =>
{
await semaphore.WaitAsync(ct);
try { return await functionAsync(isManual, ct); }
finally { semaphore.Release(); }
});
}
else if (semaphore.Wait(0))
{
// Triggered by the timer and semaphore acquired synchronously
return Observable
.StartAsync(ct => functionAsync(isManual, ct))
.Finally(() => semaphore.Release());
}
return null; // Otherwise ignore the signal
})
.Where(op => op != null)
.Switch(); // Pending operations are unsubscribed and canceled
});
}
out Action manualInvocation
参数是触发手动调用的机制。
用法示例:
int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
await Task.Delay(500, token);
return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();
await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);
subscription.Dispose();
输出:
19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic
使用 Scan
和 DistinctUntilChanged
运算符删除元素的技术,而先前的异步操作是 运行,是从 问题中借用的。
¹ Rx 库似乎无法令人满意地处理这种混乱的业务,因为它只是 omits disposing of the CancellationTokenSources it creates.
这是您需要的查询:
var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(1.0);
IObservable<string> query =
subject
.StartWith(Unit.Default)
.Select(x => Observable.Timer(TimeSpan.Zero, delay))
.Switch()
.SelectMany(x => Observable.FromAsync(() => GetSomeData()));
如果您在任何时候调用 subject.OnNext(Unit.Default)
,它将立即触发对 GetSomeData
的调用,然后根据 delay
中设置的 TimeSpan
重复调用。
使用 .StartWith(Unit.Default)
将设置在有订阅者时立即进行查询。
使用 .Switch()
会取消任何基于正在调用的新 subject.OnNext(Unit.Default)
的未决操作。
这应该与您的弹珠图匹配。
以上版本没有引入值之间的延迟。
版本 2 应该。
var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(5.0);
var source = Observable.FromAsync(() => GetSomeData());
IObservable<string> query =
subject
.StartWith(Unit.Default)
.Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
.Switch();
我使用 Expand
运算符在值之间引入延迟。只要 source
只产生一个值(FromAsync
就可以),这应该可以正常工作。
我建议不要尝试取消已经开始的通话。事情会变得太乱。
如果 GetSomeValueAsync 中的逻辑涉及数据库调用 and/or web API 调用,您根本无法真正取消调用。
我认为这里的关键是确保对 GetSomeValueAsync 的所有调用都被序列化。
我根据 Enigmativity 的版本 1 创建了以下解决方案。
它在 asp.net core 3.1 的 webassembly blazor 页面上进行了测试,工作正常。
private int _ticks = 0; //simulate a resource you want serialized access
//for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3...
protected async Task<string> GetSomeValueAsync(string trigger)
{
var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";
await Task.Delay(1000);
_ticks += 1;
return valueToReturn;
}
//define two subjects
private Subject<string> _testSubject = new Subject<string>();
private Subject<string> _getDataSubject = new Subject<string>();
//driving observable, based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable<string> getDataObservable =
_testSubject
.StartWith("Init")
.Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString()))
.Switch()
.WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
.Where(a => a.Second == "IDLE")
.Select(a => a.First);
//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
_getDataSubject.OnNext("WORKING");
//_service.LogToConsole is my helper function to log data to console
await _service.LogToConsole(await GetSomeValueAsync(t));
_getDataSubject.OnNext("IDLE");
}));
就是这样。我用一个按钮来触发手动事件。
输出中的_ticks总是按顺序排列的,即没有发生重叠。
我想要一种将异步方法转换为可观察对象的通用方法。就我而言,我正在处理使用 HttpClient
从 API.
假设我们有方法 Task<string> GetSomeData()
需要变成单个 Observable<string>
,其中值是作为以下组合生成的:
- 重复定期调用
GetSomeData()
(例如每 x 秒) - 在任何给定时间手动触发对
GetSomeData()
的调用(例如,当用户点击刷新时)。
因为有两种方法可以触发 GetSomeData()
并发性的执行可能是一个问题。为了避免要求 GetSomeData()
是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要用一些策略来处理重叠的请求。我做了一个(某种)大理石图来描述问题和想要的结果
我的直觉告诉我有一个简单的方法可以做到这一点,所以请给我一些见解:)
这是我目前得到的解决方案。不幸的是,它没有解决并发问题。
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall()
{
_manualCallsSubject.OnNext(Unit.Default);
}
}
延迟重复的扩展方法:
static class Extensions
{
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
}
包含生成可观察对象方法的服务示例
class SomeService
{
private int _ticks = 0;
public async Task<string> GetSomeValueAsync()
{
//Just a hack to dermine if request was triggered manuall or by timer
var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";
//Here we have a data race! We would like to limit access to this method
var valueToReturn = $"{_ticks} ({initiatationWay})";
await Task.Delay(500);
_ticks += 1;
return valueToReturn;
}
}
这样使用(会发生数据竞争):
static async Task Main(string[] args)
{
//Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
var someService = new SomeService();
var stopwatch = Stopwatch.StartNew();
var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
observableWrapper.Stream
.Take(6)
.Subscribe(x =>
{
Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
});
await Task.Delay(4000);
observableWrapper.TriggerAdditionalCall();
observableWrapper.TriggerAdditionalCall();
Console.ReadLine();
}
这是我对这个问题的看法:
更新: 通过借鉴 Enigmativity 的 Observable.StartAsync
method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim
.
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
Func<bool, CancellationToken, Task<T>> functionAsync,
TimeSpan period,
out Action manualInvocation)
{
// Arguments validation omitted
var manualSubject = new Subject<bool>();
manualInvocation = () => manualSubject.OnNext(true);
return Observable.Defer(() =>
{
var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
return Observable
.Interval(period)
.Select(_ => false) // Not manual
.Merge(manualSubject)
.TakeUntil(isManual => isManual) // Stop on first manual
.Repeat() // ... and restart the timer
.Prepend(false) // Skip the initial interval delay
.Select(isManual =>
{
if (isManual)
{
// Triggered manually
return Observable.StartAsync(async ct =>
{
await semaphore.WaitAsync(ct);
try { return await functionAsync(isManual, ct); }
finally { semaphore.Release(); }
});
}
else if (semaphore.Wait(0))
{
// Triggered by the timer and semaphore acquired synchronously
return Observable
.StartAsync(ct => functionAsync(isManual, ct))
.Finally(() => semaphore.Release());
}
return null; // Otherwise ignore the signal
})
.Where(op => op != null)
.Switch(); // Pending operations are unsubscribed and canceled
});
}
out Action manualInvocation
参数是触发手动调用的机制。
用法示例:
int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
await Task.Delay(500, token);
return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();
await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);
subscription.Dispose();
输出:
19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic
使用 Scan
和 DistinctUntilChanged
运算符删除元素的技术,而先前的异步操作是 运行,是从
¹ Rx 库似乎无法令人满意地处理这种混乱的业务,因为它只是 omits disposing of the CancellationTokenSources it creates.
这是您需要的查询:
var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(1.0);
IObservable<string> query =
subject
.StartWith(Unit.Default)
.Select(x => Observable.Timer(TimeSpan.Zero, delay))
.Switch()
.SelectMany(x => Observable.FromAsync(() => GetSomeData()));
如果您在任何时候调用 subject.OnNext(Unit.Default)
,它将立即触发对 GetSomeData
的调用,然后根据 delay
中设置的 TimeSpan
重复调用。
使用 .StartWith(Unit.Default)
将设置在有订阅者时立即进行查询。
使用 .Switch()
会取消任何基于正在调用的新 subject.OnNext(Unit.Default)
的未决操作。
这应该与您的弹珠图匹配。
以上版本没有引入值之间的延迟。
版本 2 应该。
var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(5.0);
var source = Observable.FromAsync(() => GetSomeData());
IObservable<string> query =
subject
.StartWith(Unit.Default)
.Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
.Switch();
我使用 Expand
运算符在值之间引入延迟。只要 source
只产生一个值(FromAsync
就可以),这应该可以正常工作。
我建议不要尝试取消已经开始的通话。事情会变得太乱。 如果 GetSomeValueAsync 中的逻辑涉及数据库调用 and/or web API 调用,您根本无法真正取消调用。
我认为这里的关键是确保对 GetSomeValueAsync 的所有调用都被序列化。
我根据 Enigmativity 的版本 1 创建了以下解决方案。 它在 asp.net core 3.1 的 webassembly blazor 页面上进行了测试,工作正常。
private int _ticks = 0; //simulate a resource you want serialized access
//for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3...
protected async Task<string> GetSomeValueAsync(string trigger)
{
var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";
await Task.Delay(1000);
_ticks += 1;
return valueToReturn;
}
//define two subjects
private Subject<string> _testSubject = new Subject<string>();
private Subject<string> _getDataSubject = new Subject<string>();
//driving observable, based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable<string> getDataObservable =
_testSubject
.StartWith("Init")
.Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString()))
.Switch()
.WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
.Where(a => a.Second == "IDLE")
.Select(a => a.First);
//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
_getDataSubject.OnNext("WORKING");
//_service.LogToConsole is my helper function to log data to console
await _service.LogToConsole(await GetSomeValueAsync(t));
_getDataSubject.OnNext("IDLE");
}));
就是这样。我用一个按钮来触发手动事件。 输出中的_ticks总是按顺序排列的,即没有发生重叠。