简单的 Rx 示例要么阻塞要么有竞争条件

Simple Rx example either blocks or has race condition

观看后:Intro to Reactive Programming 我正在尝试使用 C# 和 reactiveX 复制该示例。总之,它是一个货币转换器。用户指定他们想要的货币数量和类型。当前汇率是从服务器检索的,到 return 需要一定的时间。由于这个可变的服务器时间,旧的货币请求可能会在更新的请求之后出现。这个警告被用作使事情反应以避免竞争条件错误的动机。我可以通过在代表货币汇率的任务上使用 Wait() 来避免我的反应代码中的竞争条件,但这会阻止 GUI。我应该如何处理这个问题?也许我应该取消其他任务?下面是我的代码,它没有阻塞但有竞争条件问题。

public CurrencyWindow()
{
    InitializeComponent();

    var amount = Observable.FromEventPattern<TextChangedEventArgs>(
        txtAmount, "TextChanged").Select(t=>int.Parse(txtAmount.Text));

    var yen = Observable.FromEventPattern<RoutedEventArgs>(
        rdoYen, "Checked").Select(t => "Yen");
    var dollar = Observable.FromEventPattern<RoutedEventArgs>(
        rdoDollar, "Checked").Select(t => "Dollar");
    var pound = Observable.FromEventPattern<RoutedEventArgs>(
        rdoPound, "Checked").Select(t => "Pound");

    // merge the currency selection into one stream.
    var currencyType = yen.Merge(dollar).Merge(pound);

    // Lets now get a stream of currency rates.
    var exchangeRate = currencyType.Select(s => GetExchangeRate(s));


    amount.CombineLatest(exchangeRate, async (i, d) => i * await d).Subscribe(async v =>
    {
        var val = await v;
        txtCost.Text = val.ToString();
    });
}

private Task<double> GetExchangeRate(string currency)
{
    double exchange = 0;
    switch(currency)
    {
        case "Yen":
            exchange = 1;
            break;
        case "Pound":
            exchange = 2;
            break;
        case "Dollar":
            exchange = 3;
            break;
    }

    return Task.Factory.StartNew(() => 
    {
        System.Threading.Thread.Sleep((int)exchange * 3000);
        return exchange;
    });
}

试一试这段代码,看看它是否有效:

var exchangeRate = currencyType
    .Select(s => Observable.FromAsync(() => GetExchangeRate(s)))
    .Switch();

amount
    .CombineLatest(exchangeRate, (i, d) => i * d)
    .Subscribe(v =>
    {
        var val = v;
        txtCost.Text = val.ToString();
    });

此代码将确保只有最新的 currencyType 产生的 return 汇率。如果出现一种新货币,那么所有之前对 GetExchangeRate 的飞行调用都将被忽略(即使它们在当前货币之后进入)。