如何在 rx.net 中组合 Grouped Observables?

How to combine GroupedObservables in rx.net?

我有一个 observable,我使用 GroupBy 来获取多个流。我实际上想要 Scan 每个子流的结果。假设可观察到的是产品价格,扫描结果是每种产品类型的平均价格。

我有另一个与 'products' 相关的事件流(比方说 "show product price" 事件),我想将它与上一个流的最新产品价格结合起来。因此,每组的 Scan 输出需要与事件流的每个元素相结合,以获得该事件产品的最新平均价格。

出于某种原因,我无法获得正确的语法,而且我整天都在抨击这个。有人可以帮忙吗?


更新

我正在添加下面的代码来说明大概的意图。

 public class Node
{
    private List<int> Details = new List<int>();

    public void AddInfo(int x)
    {
        Details.Add(x );
    }

    public Node(int x)
    {
        Details.Add(x);  
    }

    public int Index => Details[0]%10; //just to simplify the grouping and debugging

    public int Latest => Details.Last();
}

public class Message
{
    private static Random _random = new Random();

    public int MessageNodeInfo { get; private set; }

    public Message()
    {
        MessageNodeInfo = _random.Next(); 
    }
}


public class AccumulatingInfoTest
{


    private static Random _random=new Random();

    private IObservable<Message> MessageStream()
    {
        TimeSpan timeSpan = TimeSpan.FromSeconds(0.5);


        var ret= Observable.Generate(0,
            _ => { return true; }, 
            _ => { return 0; }, 
            _ => { return new Message(); },
            _=> timeSpan)
            .Publish()
            .RefCount();



        return ret;

    }


    public class ArbitraryCommonClass
    {
        public int K { get; set; }
        public Message M { get; set; }
        public Node D { get; set; }

        public ArbitraryCommonClass Combine(ArbitraryCommonClass a)
        {
            return new ArbitraryCommonClass()
            {
                K = this.K,
                M = this.M ?? a.M,
                D = this.D ?? a.D
            };
        }
    }

    public void Start()
    {

        var inputStream = MessageStream();

        inputStream.Subscribe(y => Console.WriteLine("Input: K " + y.MessageNodeInfo % 10 + " V " + y.MessageNodeInfo));


        var nodeInfoStream = inputStream
            .Select(nodeInfo => new Node(nodeInfo.MessageNodeInfo))
            .GroupBy(node => node.Index)
            .Select(groupedObservable => new
                        {
                            Key = groupedObservable.Key,
                            Observable = groupedObservable
                                .Scan(

                                    (nodeAcc, node) => { nodeAcc.AddInfo(node.Latest); return nodeAcc; }

                                    )
                                .Select(a => new ArbitraryCommonClass() { K = a.Index, M = (Message)null, D = a })

                        }
                    );

        var groupedMessageStream =
            inputStream
            .GroupBy(
                    m => new Node(m.MessageNodeInfo).Index
                    )
            .Select(a => new
                        {
                            Key =a.Key,
                            Observable = a.Select(b => new ArbitraryCommonClass() { K = a.Key, M = b, D = null })

                        });



        var combinedStreams = nodeInfoStream
            .Merge(groupedMessageStream)
            .GroupBy(s => s.Key)
            .Select(grp => grp
                .Scan(

                    (state, next) => new { Key = state.Key, Observable = Observable.CombineLatest(state.Observable, next.Observable, (x, y) => { return x.Combine(y); }) }
                )



            )
            .Merge()
            .SelectMany(x => x.Observable.Select(a=>a));

        combinedStreams.Where(x=>x.M!=null).Subscribe(x => Console.WriteLine(x.K + " " + x.M.MessageNodeInfo + " " + x.D.Latest));














    }
}

假设以下 class:

public class Product
{
    public string Type { get; set; } = "Default";
    public decimal Price { get; set; }
}

这是 GroupByScan 的用法(显示按类型分组的平均产品价格)。诀窍是 Select 在分组的可观察对象上到达各个分组,做任何事情,然后(大概)将它们合并回一起。您可以将 SelectMerge 折叠成一个 SelectMany,但分开后更容易阅读:

var productSubject = new Subject<Product>();
var printSignal = new Subject<Unit>();

var latestAverages = productSubject.GroupBy(p => p.Type)
    .Select(g => g
        .Scan((0, 0.0m), (state, item) => (state.Item1 + 1, state.Item2 + item.Price)) //hold in state the count and the running total for each group
        .Select(t => (g.Key, t.Item2 / t.Item1)) //divide to get the average
    )
    .Merge()
    .Scan(ImmutableDictionary<string, decimal>.Empty, (state, t) => state.SetItem(t.Key, t.Item2)); //Finally, cache the average by group.


printSignal.WithLatestFrom(latestAverages, (_, d) => d)
    .Subscribe(avgs =>
    {
        foreach (var avg in avgs)
        {
            Console.WriteLine($"ProductType: {avg.Key}. Average: {avg.Value}");
        }
        Console.WriteLine();
    });

var productsList = new List<Product>()
{
    new Product { Price = 1.00m },
    new Product { Price = 2.00m },
    new Product { Price = 3.00m },

    new Product { Price = 2.00m, Type = "Alternate" },
    new Product { Price = 4.00m, Type = "Alternate" },
    new Product { Price = 6.00m, Type = "Alternate" },
};

productsList.ForEach(p => productSubject.OnNext(p));

printSignal.OnNext(Unit.Default);
productSubject.OnNext(new Product { Price = 4.0m });
printSignal.OnNext(Unit.Default);
productSubject.OnNext(new Product { Price = 8.0m, Type = "Alternate" });
printSignal.OnNext(Unit.Default);

这使用了 nuget 包 System.Collections.Immutable