System.reactive - 按频率动态切换
System.reactive - dynamically switch by frequency
我有两个数据来源。
让我们想象一下:
- 系统 A 以更高的频率提供质量更好的数据,例如
1price/1sec,但有时会出现故障,没有数据或
频率是例如 1price/20sec
- 系统 B 提供频率较低的数据,例如1 价格/10 秒
是否有任何优雅的方法使用 system.reactive 正常从系统 A 检索数据但是当它失败(提要中没有数据)或速度变慢时,使用数据来自B系统?
我想实现某种开关,当它比 B 快时使用 A 源。我不想混合源,所以我一次只能使用 SystemA 或 SystemB。
class PriceFeed {
public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
{
}
private Price Convert(PriceFromA price) { //convert }
private Price Convert(PriceFromB price) { //convert }
}
有趣的问题。首先要做的是编写某种频率收集函数。可能看起来像这样:
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
return source.Buffer(lookback, measuringFreq, scheduler)
.Select(l => l.Count);
}
如果 measuringFreq
是 1 秒,而 lookback
是 5 秒,这意味着每秒我们都会看到最近 5 秒内传递了多少消息的计数。快速而肮脏的例子:
var r = new System.Random();
var nums = Observable.Generate(
0,
i => i < 100,
i => i + 1,
i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad
nums
是一个 observable,它应该平均每半秒生成一条消息(它随机选择 0 到 1 秒之间的持续时间)。 freq
每秒产生一个值,该值 returns 过去 5 秒内产生的消息数 nums
(平均应为 10)。在我机器上的最新 运行 上,我得到这个:
11
11
12
10
12
11
9
9
10
9
8
...
一旦我们有了获取频率的方法,您就需要编写一个函数来将两个 like-typed 可观察值合成在一起,并根据频率进行切换。我写了这个:
public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? sourceB : sourceA) //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
.StartWith(sourceA)
.Switch();
return toReturn;
}
首先我们用 GetFrequency
得到两个可观察量的频率,然后我们将这两个压缩到一起,并比较它们。如果 B 比 A 更频繁,则使用 B。如果它们相等或 A 更频繁,则使用 A。
aAdvantage
变量允许您表达对 A 比 B 更强烈的偏好。0(默认值)表示来源 A 赢得平局,或者当它更频繁时,否则 B 获胜。 2 意味着 B 在最近一段时间内必须比 A 多生成 3 条消息才能要求使用 B。
使用适当的 Publishing
可观察对象来避免多次订阅,它看起来像这样:
public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback,
IScheduler scheduler, int aAdvantage = 0)
{
return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB =>
_sourceA.GetFrequency(measuringFreq, lookback, scheduler)
.Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
.StartWith(_sourceA)
.Switch()
))
}
希望对您有所帮助。关于如何将其融入您的代码,您并没有留下太多内容。如果你想要那个,那么请包括一个 mcve.
我有两个数据来源。
让我们想象一下:
- 系统 A 以更高的频率提供质量更好的数据,例如
1price/1sec,但有时会出现故障,没有数据或
频率是例如 1price/20sec - 系统 B 提供频率较低的数据,例如1 价格/10 秒
是否有任何优雅的方法使用 system.reactive 正常从系统 A 检索数据但是当它失败(提要中没有数据)或速度变慢时,使用数据来自B系统? 我想实现某种开关,当它比 B 快时使用 A 源。我不想混合源,所以我一次只能使用 SystemA 或 SystemB。
class PriceFeed {
public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
{
}
private Price Convert(PriceFromA price) { //convert }
private Price Convert(PriceFromB price) { //convert }
}
有趣的问题。首先要做的是编写某种频率收集函数。可能看起来像这样:
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
return source.Buffer(lookback, measuringFreq, scheduler)
.Select(l => l.Count);
}
如果 measuringFreq
是 1 秒,而 lookback
是 5 秒,这意味着每秒我们都会看到最近 5 秒内传递了多少消息的计数。快速而肮脏的例子:
var r = new System.Random();
var nums = Observable.Generate(
0,
i => i < 100,
i => i + 1,
i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad
nums
是一个 observable,它应该平均每半秒生成一条消息(它随机选择 0 到 1 秒之间的持续时间)。 freq
每秒产生一个值,该值 returns 过去 5 秒内产生的消息数 nums
(平均应为 10)。在我机器上的最新 运行 上,我得到这个:
11
11
12
10
12
11
9
9
10
9
8
...
一旦我们有了获取频率的方法,您就需要编写一个函数来将两个 like-typed 可观察值合成在一起,并根据频率进行切换。我写了这个:
public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? sourceB : sourceA) //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
.StartWith(sourceA)
.Switch();
return toReturn;
}
首先我们用 GetFrequency
得到两个可观察量的频率,然后我们将这两个压缩到一起,并比较它们。如果 B 比 A 更频繁,则使用 B。如果它们相等或 A 更频繁,则使用 A。
aAdvantage
变量允许您表达对 A 比 B 更强烈的偏好。0(默认值)表示来源 A 赢得平局,或者当它更频繁时,否则 B 获胜。 2 意味着 B 在最近一段时间内必须比 A 多生成 3 条消息才能要求使用 B。
使用适当的 Publishing
可观察对象来避免多次订阅,它看起来像这样:
public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback,
IScheduler scheduler, int aAdvantage = 0)
{
return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB =>
_sourceA.GetFrequency(measuringFreq, lookback, scheduler)
.Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
.StartWith(_sourceA)
.Switch()
))
}
希望对您有所帮助。关于如何将其融入您的代码,您并没有留下太多内容。如果你想要那个,那么请包括一个 mcve.