在流 (Rx) 产生的第一个值上终止循环
Terminate loop on first value produced by stream (Rx)
我有以下 Rx 方法链,我正在寻找 clean/improve 下面显示的 Select
方法,这可以用现有的 Rx 运算符完成吗,尝试使用 Amb
但是查询了所有 ISIN 和 return 产生的第一个价格。
我想要为 ISIN 生成的第一个价格,而不是查询任何其他 ISIN,换句话说,我想以串行方式调用 GetPrice - 首先尝试 ISIN,尝试第二个 ISIN,尝试第三个 ISIN 等...
public class Class1
{
public Class1()
{
GetIsins()
.Select(isins =>
{
*decimal? price = null;
foreach (var isin in isins)
{
price = GetPrice(isin)
.Take(1)
.Wait();
if (price.HasValue)
break;
}
return price;*
})
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
}
public IObservable<string[]> GetIsins()
{
return Observable.Return(new []
{
"US9311421039",
"TR9311421033",
"UK3342130394"
});
}
public IObservable<decimal?> GetPrice(string isin)
{
return Observable.Return((decimal?)100m);
}
}
假设我正确理解了问题,您可能需要将 GetPrice
操作作为订阅可观察对象的副作用。
GetIsins()
.SelectMany(isins => isins)
.Select(isin => Observable.Defer(() =>
{
return GetPrice(isin);
}))
.Concat() // or .Merge(1)
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
Concat
运算符确保序列中的每个 IObservable<decimal?>
都必须在订阅下一个之前完成。
Defer
运算符确保每个 IObservable<decimal?>
在订阅之前不会启动(换句话说,确保它是 cold 可观察到的)。
我有以下 Rx 方法链,我正在寻找 clean/improve 下面显示的 Select
方法,这可以用现有的 Rx 运算符完成吗,尝试使用 Amb
但是查询了所有 ISIN 和 return 产生的第一个价格。
我想要为 ISIN 生成的第一个价格,而不是查询任何其他 ISIN,换句话说,我想以串行方式调用 GetPrice - 首先尝试 ISIN,尝试第二个 ISIN,尝试第三个 ISIN 等...
public class Class1
{
public Class1()
{
GetIsins()
.Select(isins =>
{
*decimal? price = null;
foreach (var isin in isins)
{
price = GetPrice(isin)
.Take(1)
.Wait();
if (price.HasValue)
break;
}
return price;*
})
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
}
public IObservable<string[]> GetIsins()
{
return Observable.Return(new []
{
"US9311421039",
"TR9311421033",
"UK3342130394"
});
}
public IObservable<decimal?> GetPrice(string isin)
{
return Observable.Return((decimal?)100m);
}
}
假设我正确理解了问题,您可能需要将 GetPrice
操作作为订阅可观察对象的副作用。
GetIsins()
.SelectMany(isins => isins)
.Select(isin => Observable.Defer(() =>
{
return GetPrice(isin);
}))
.Concat() // or .Merge(1)
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
Concat
运算符确保序列中的每个 IObservable<decimal?>
都必须在订阅下一个之前完成。
Defer
运算符确保每个 IObservable<decimal?>
在订阅之前不会启动(换句话说,确保它是 cold 可观察到的)。