rx.net 锁定 ToEnumerable 的使用

rx.net locking up from use of ToEnumerable

我正在尝试转换以下语句,以便我可以在所选列表旁边获取密钥:

var feed = new Subject<TradeExecuted>();

feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
  .SelectMany(x => x.ToList())
  .Select(trades => Observable.FromAsync(() => Mediator.Publish(trades, cts.Token)))
  .Concat() // Ensure that the results are serialized.
  .Subscribe(cts.Token); // Check status of calls.

以上有效,而以下无效 - 当我尝试遍历列表时,它锁定了。

feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
  .Select(x => Observable.FromAsync(() =>
  {
      var list = x.ToEnumerable(); // <---- LOCK UP if we use list.First() etc
      var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list);
      return Mediator.Publish(aggregate, cts.Token);
  }))
  .Concat()
  .Subscribe(cts.Token); // Check status of calls.

我显然做错了,而且可能很可怕!

回到最初的代码,我怎样才能获得可枚举列表旁边的密钥(并避免下面的 hack)?


作为旁注,下面的代码有效,但它是一个讨厌的 hack,我从第一个列表项中获取密钥:

feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
  .SelectMany(x => x.ToList())
  .Select(trades => Observable.FromAsync(() =>
  {
      var firstTrade = trades.First();
      var aggregate = AggregateTrades(firstTrade.Execution.Contract.Symbol, firstTrade.Execution.AccountId, firstTrade.Tenant, trades);
      return Mediator.Publish(aggregate, cts.Token);
  }))
  .Concat() // Ensure that the results are serialized.
  .Subscribe(cts.Token); // Check status of calls.

您的代码的所有版本都试图急切地评估分组 sub-observable。由于在 v1 和 v3 中,您的组可观察性将 运行 最多 5 秒,这不是 horrible/awful,但它仍然不是很好。在 v2 中,我不知道 timespan 是什么,但假设它是 5 秒,你有同样的问题:试图将分组的 sub-observable 变成列表或可枚举意味着等待 sub-observable 完成,阻塞线程(或任务)。

您可以通过使用 Buffer 运算符延迟计算分组 sub-observable:

来解决此问题
var timespan = TimeSpan.FromSeconds(5);
feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
  .SelectMany(x => x
    .Buffer(timespan)
    .Select(list => Observable.FromAsync(() => 
    {
      var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list));
      return Mediator.Publish(aggregate, cts.Token);
    }))
  )
  .Concat()  // Ensure that the results are serialized.
  .Subscribe(cts.Token); // Check status of calls.

这实质上意味着在 timespan 结束之前,组中的项目将聚集在 Buffer 内的列表中。一旦 timespan 启动,它们就会作为列表发布,然后调解器发布。