rx.net 锁定 ToEnumerable 的使用
rx.net locking up from use of ToEnumerable
我正在尝试转换以下语句,以便我可以在所选列表旁边获取密钥:
var feed = new Subject<TradeExecuted>();
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => Mediator.Publish(trades, cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
以上有效,而以下无效 - 当我尝试遍历列表时,它锁定了。
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
.Select(x => Observable.FromAsync(() =>
{
var list = x.ToEnumerable(); // <---- LOCK UP if we use list.First() etc
var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list);
return Mediator.Publish(aggregate, cts.Token);
}))
.Concat()
.Subscribe(cts.Token); // Check status of calls.
我显然做错了,而且可能很可怕!
回到最初的代码,我怎样才能获得可枚举列表旁边的密钥(并避免下面的 hack)?
作为旁注,下面的代码有效,但它是一个讨厌的 hack,我从第一个列表项中获取密钥:
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() =>
{
var firstTrade = trades.First();
var aggregate = AggregateTrades(firstTrade.Execution.Contract.Symbol, firstTrade.Execution.AccountId, firstTrade.Tenant, trades);
return Mediator.Publish(aggregate, cts.Token);
}))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
您的代码的所有版本都试图急切地评估分组 sub-observable。由于在 v1 和 v3 中,您的组可观察性将 运行 最多 5 秒,这不是 horrible/awful,但它仍然不是很好。在 v2 中,我不知道 timespan
是什么,但假设它是 5 秒,你有同样的问题:试图将分组的 sub-observable 变成列表或可枚举意味着等待 sub-observable 完成,阻塞线程(或任务)。
您可以通过使用 Buffer
运算符延迟计算分组 sub-observable:
来解决此问题
var timespan = TimeSpan.FromSeconds(5);
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
.SelectMany(x => x
.Buffer(timespan)
.Select(list => Observable.FromAsync(() =>
{
var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list));
return Mediator.Publish(aggregate, cts.Token);
}))
)
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
这实质上意味着在 timespan
结束之前,组中的项目将聚集在 Buffer
内的列表中。一旦 timespan
启动,它们就会作为列表发布,然后调解器发布。
我正在尝试转换以下语句,以便我可以在所选列表旁边获取密钥:
var feed = new Subject<TradeExecuted>();
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => Mediator.Publish(trades, cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
以上有效,而以下无效 - 当我尝试遍历列表时,它锁定了。
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
.Select(x => Observable.FromAsync(() =>
{
var list = x.ToEnumerable(); // <---- LOCK UP if we use list.First() etc
var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list);
return Mediator.Publish(aggregate, cts.Token);
}))
.Concat()
.Subscribe(cts.Token); // Check status of calls.
我显然做错了,而且可能很可怕!
回到最初的代码,我怎样才能获得可枚举列表旁边的密钥(并避免下面的 hack)?
作为旁注,下面的代码有效,但它是一个讨厌的 hack,我从第一个列表项中获取密钥:
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() =>
{
var firstTrade = trades.First();
var aggregate = AggregateTrades(firstTrade.Execution.Contract.Symbol, firstTrade.Execution.AccountId, firstTrade.Tenant, trades);
return Mediator.Publish(aggregate, cts.Token);
}))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
您的代码的所有版本都试图急切地评估分组 sub-observable。由于在 v1 和 v3 中,您的组可观察性将 运行 最多 5 秒,这不是 horrible/awful,但它仍然不是很好。在 v2 中,我不知道 timespan
是什么,但假设它是 5 秒,你有同样的问题:试图将分组的 sub-observable 变成列表或可枚举意味着等待 sub-observable 完成,阻塞线程(或任务)。
您可以通过使用 Buffer
运算符延迟计算分组 sub-observable:
var timespan = TimeSpan.FromSeconds(5);
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
.SelectMany(x => x
.Buffer(timespan)
.Select(list => Observable.FromAsync(() =>
{
var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list));
return Mediator.Publish(aggregate, cts.Token);
}))
)
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
这实质上意味着在 timespan
结束之前,组中的项目将聚集在 Buffer
内的列表中。一旦 timespan
启动,它们就会作为列表发布,然后调解器发布。