System.reactive - 按频率动态切换

System.reactive - dynamically switch by frequency

我有两个数据来源。

让我们想象一下:

是否有任何优雅的方法使用 system.reactive 正常从系统 A 检索数据但是当它失败(提要中没有数据)或速度变慢时,使用数据来自B系统? 我想实现某种开关,当它比 B 快时使用 A 源。我不想混合源,所以我一次只能使用 SystemA 或 SystemB。


    class PriceFeed {

        public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
        {


        }


        private Price Convert(PriceFromA price) { //convert }

        private Price Convert(PriceFromB price) { //convert }

    }

有趣的问题。首先要做的是编写某种频率收集函数。可能看起来像这样:

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
    return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
    return source.Buffer(lookback, measuringFreq, scheduler)
        .Select(l => l.Count);
}

如果 measuringFreq 是 1 秒,而 lookback 是 5 秒,这意味着每秒我们都会看到最近 5 秒内传递了多少消息的计数。快速而肮脏的例子:

var r = new System.Random();
var nums = Observable.Generate(
    0, 
    i => i < 100, 
    i => i + 1, 
    i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad

nums 是一个 observable,它应该平均每半秒生成一条消息(它随机选择 0 到 1 秒之间的持续时间)。 freq 每秒产生一个值,该值 returns 过去 5 秒内产生的消息数 nums(平均应为 10)。在我机器上的最新 运行 上,我得到这个:

11
11
12
10
12
11
9
9
10
9
8
...

一旦我们有了获取频率的方法,您就需要编写一个函数来将两个 like-typed 可观察值合成在一起,并根据频率进行切换。我写了这个:

public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
    var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
    var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);

    var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
        .Select(freqDifference => freqDifference < 0 ? sourceB : sourceA)   //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
        .StartWith(sourceA)
        .Switch();

    return toReturn;
}

首先我们用 GetFrequency 得到两个可观察量的频率,然后我们将这两个压缩到一起,并比较它们。如果 B 比 A 更频繁,则使用 B。如果它们相等或 A 更频繁,则使用 A。

aAdvantage 变量允许您表达对 A 比 B 更强烈的偏好。0(默认值)表示来源 A 赢得平局,或者当它更频繁时,否则 B 获胜。 2 意味着 B 在最近一段时间内必须比 A 多生成 3 条消息才能要求使用 B。

使用适当的 Publishing 可观察对象来避免多次订阅,它看起来像这样:

public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, 
    IScheduler scheduler, int aAdvantage = 0)
{
    return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB => 
        _sourceA.GetFrequency(measuringFreq, lookback, scheduler)
            .Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
            .Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
            .StartWith(_sourceA)
            .Switch()

    ))
}

希望对您有所帮助。关于如何将其融入您的代码,您并没有留下太多内容。如果你想要那个,那么请包括一个 mcve.