将多个可观察对象合并到单个字典中

Merging multiple observables into single dictionary

我想将多个 observables 组合成一个字典对象,其中每个 returns 一个单独的 Update 对象。

这是我想要实现的示例:

private IObservable<IDictionary<string, IUpdate>> CreateUpdateStreams(Product product)
{
  var codeObservables = product.Codes.Select(code => CreateUpdateStream(code)).ToList();

  //??? 
  return pointObs.Merge().Select(update => ...);
}


private IObservable<IUpdate> CreateUpdateStream(string code)
{
  ...
  //return an observable of IUpdate
}

产品 = Foo

Product.Codes = {代码 1, 代码 2, 代码 3}

IDictionary = {Code1, "a"}, {Code2, "b"}, {Code3, "c"}

根据更新的值(在本例中为 a/b/c),将对相应的代码进行不同的更改,例如设置 属性 如 Code.State = "a",等等

由于每个 codeObservable 都会以不同的速率更新,合并似乎是明智的起点。我不确定如何让来自单个可观察对象的更新更新一个保留过去值的字典对象。

这是您的问题的一个例子,它利用了匿名类型。它依赖于字典的副作用。请注意,由于 Rx 保证顺序行为,因此不需要在字典上进行同步。

private IObservable<IReadOnlyDictionary<string, IUpdate>> CreateUpdateStreams(Product product)
    {
        var dictionary = new Dictionary<string, IUpdate>();
         return
          product.Codes.Select(
              code => CreateUpdateStream(code).Select(update => new {Update = update, Code = code}))
              .Merge()
              .Do(element => dictionary.Add(element.Code, element.Update))
              .Select(_ => dictionary);
    }

注意我把方法签名改成return IObservable<IReadOnlyDictionary<,>> 防止客户端代码篡改字典。另一种选择是每次 return 字典的新副本。这确保了不可变行为(但可能会影响性能,具体取决于字典的大小),如下所示:

private IObservable<IDictionary<string, IUpdate>> CreateUpdateStreams(Product product)
    {
        var dictionary = new Dictionary<string, IUpdate>();
        return
            product.Codes.Select(
                code => CreateUpdateStream(code).Select(update => new {Update = update, Code = code}))
                .Merge()
                .Select(element =>
                {
                    dictionary.Add(element.Code, element.Update);
                    return new Dictionary<string, IUpdate>(dictionary);
                });
    }