对缓冲的 Observable 进行排序

Sorting buffered Observables

我有一个生成速度非常快的令牌流和一个相对较慢的处理器。令牌分为三种子类型,我更希望按优先级处理它们。因此,我希望令牌在生成并等待处理后进行缓冲,并按优先级对缓冲区进行排序。

这是我的 类:

public enum Priority
{
    High   = 3,
    Medium = 2,
    Low    = 1
}

public class Base : IComparable<Base>
{
    public int Id { get; set; }

    public int CompareTo(Base other)
    {
        return Id.CompareTo(other.Id);
    }
}

public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }

public class Token : IComparable<Token>
{
    private readonly string _toString;

    public Foo Foo { get; }

    public Bar Bar { get; }

    public Baz Baz { get; }

    public Priority Priority =>
        Baz == null
            ? Bar == null
                ? Priority.High
                : Priority.Medium
            : Priority.Low;

    public int CompareTo(Token other)
    {
        if (Priority > other.Priority)
        {
            return -1;
        }

        if (Priority < other.Priority)
        {
            return 1;
        }

        switch (Priority)
        {
            case Priority.High:
                return Foo.CompareTo(other.Foo);
            case Priority.Medium:
                return Bar.CompareTo(other.Bar);
            case Priority.Low:
                return Baz.CompareTo(other.Baz);
            default:
                throw new ArgumentOutOfRangeException();
        }
    }

    public override string ToString()
    {
        return _toString;
    }

    public Token(Foo foo)
    {
        _toString = $"{nameof(Foo)}:{foo.Id}";
        Foo = foo;
    }

    public Token(Foo foo, Bar bar) : this(foo)
    {
        _toString += $":{nameof(Bar)}:{bar.Id}";
        Bar = bar;
    }

    public Token(Foo foo, Baz baz) : this(foo)
    {
        _toString += $":{nameof(Baz)}:{baz.Id}";
        Baz = baz;
    }
}

这是我的生产者代码:

var random = new Random();
var bazId = 0;
var barId = 0;

var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
                                      .Select(Convert.ToInt32)
                                      .Take(3)
                 select new Token(new Foo { Id = id }))
                .Publish();

var barTokens = (from fooToken in fooTokens
                 from id in Observable.Range(0, random.Next(5, 10))
                                      .Select(_ => Interlocked.Increment(ref barId))
                 select new Token(fooToken.Foo, new Bar { Id = id }))
                .Publish();

var bazTokens = (from barToken in barTokens
                 from id in Observable.Range(0, random.Next(1, 5))
                                      .Select(_ => Interlocked.Increment(ref bazId))
                 select new Token(barToken.Foo, new Baz { Id = id }))
                .Publish();

var tokens = bazTokens.Merge(barTokens)
                      .Merge(fooTokens)
                      .Do(dt =>
                      {
                          Console.ForegroundColor = ConsoleColor.Red;
                          Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
                      });

// Subscription

bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();

但是我对如何缓冲和排序令牌有点困惑。如果我这样做,代币似乎是同时生产和消费的,这表明幕后正在进行一些缓冲,但我无法控制它。

tokens.Subscribe(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

如果我使用 TPL 数据流 ActionBlock,我可以看到正确生成和处理的令牌,但我仍然不确定如何进行排序。

var proc = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

tokens.Subscribe(dt => proc.Post(dt));

如果有任何想法或下一步的指示,我们将不胜感激!

更新:

我有事要做。我添加了一个助手来清理显示测试数据的代码:

private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
    if (wait.HasValue)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
    }
    Console.ForegroundColor = col;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}

我加了一个SortedSet:

var set = new SortedSet<Token>();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt => set.Add(dt));

我还添加了一个消费者,虽然我不喜欢我的实现:

var source = new CancellationTokenSource();

Task.Run(() =>
{
    while (!source.IsCancellationRequested)
    {
        var dt = set.FirstOrDefault();
        if (dt == null)
        {
            continue;
        }

        if (set.Remove(dt))
        {
            Display(dt, ConsoleColor.Green, 250);
        }
    }
}, source.Token);

所以,现在我得到了我正在寻找的结果,但是 a) 我对 while 投票不满意,b) 如果我想要多个消费者,我会运行 进入竞争条件。所以,如果有人有的话,我仍在寻找更好的实现!

你想要的容器是一个优先级队列,不幸的是在 .net 运行时没有实现(在 c++ stl/cli 但 priority_queue 不能用于其他语言).

现有的非 MS 容器可以充当此角色,您需要搜索并查看结果以选择满足您需要的容器。

使用数据流,您可以过滤标记,使每个优先级在您的管道中沿着不同的路径下降。通过对输入的每个优先级 link 使用谓词来过滤令牌。然后由您决定如何根据优先级给予偏好。

排序:

var highPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var midPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var lowPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var proc = new BufferBlock<Token>();

proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);

tokens.Subscribe(dt => proc.Post(dt));

优先考虑更高优先级项目的一种方法是允许比默认顺序处理更多的项目。您可以通过为每个优先级块设置 MaxDegreeOfParallelism 来做到这一点。

给予偏好:

var highPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 3}
var highPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, highPriOptions);

var midPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 2}   
var midPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, midPriOptions);

var lowPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var proc = new BufferBlock<Token>();

proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);

tokens.Subscribe(dt => proc.Post(dt));

这些示例绝不是完整的,但至少应该让您了解一下。

好的,所以我使用了一个普通的lock来访问SortedSet,然后增加了消费者的数量,它似乎工作正常,所以虽然我没能来使用完整的 RX 或拆分 RX / TPL DataFlow 解决方案,这现在可以满足我的要求,所以我将只展示我所做的更改除了原始问题中的更新外,还保留了它。

var set = new SortedSet<Token>();
var locker = new object();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt =>
{
    lock (locker)
    {
        set.Add(dt);
    }
});

for (var i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Run(() =>
    {
        while (!source.IsCancellationRequested)
        {
            Token dt;

            lock (locker)
            {
                dt = set.FirstOrDefault();
            }

            if (dt == null)
            {
                continue;
            }

            bool removed;

            lock (locker)
            {
                removed = set.Remove(dt);
            }

            if (removed)
            {
                Display(dt, ConsoleColor.Green, 750);
            }
        }
    }, source.Token);
}

感谢发布解决方案的人,感谢您花费的时间。

我认为这里的难题在于,您似乎真正追求的是基于快速、热门、推送源的拉模型的结果。您似乎想要的是尚未收到的 "highest" 优先级,但问题是 "received by what?" 如果您有多个订阅者,以不同的速度运行,他们每个人都可以对 "highest" 的内容有自己的看法曾是。

所以我的看法是,您想将源合并到一种反应式、优先(排序)队列中,当观察者准备好时,您可以从中提取结果。

我通过使用返回缓冲区的信号来解决这个问题,说 "my one observer is now ready to see the state of the prioritized list"。这是通过使用接收可观察到的关闭信号的 Buffer 重载来实现的。该缓冲区包含接收到的新元素列表,我只是将其合并到最后一个列表中,没有 'highest'。

该代码只是为了这个问题的目的而编写的演示代码 - 可能存在错误:

 using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxTests
{
    class Program
    {
        static void Main(string[] args)
        {
            var p = new Program();
            p.TestPrioritisedBuffer();
            Console.ReadKey();


        }

        void TestPrioritisedBuffer()
        {
            var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Do((source) => Console.WriteLine("Source1:"+source));
            var source2 = Observable.Interval(TimeSpan.FromSeconds(5)).Scan((x,y)=>(x+100)).Do((source) => Console.WriteLine("Source2:" + source)); ;

            BehaviorSubject<bool> closingSelector = new BehaviorSubject<bool>(true);



            var m = Observable.Merge(source1, source2).
                Buffer(closingSelector).
                Select(s => new { list =s.ToList(), max=(long)0 }).
               Scan((x, y) =>
               {
                   var list = x.list.Union(y.list).OrderBy(k=>k);

                   var max = list.LastOrDefault();


                   var res = new
                   {
                       list = list.Take(list.Count()-1).ToList(),
                       max= max
                   };

                   return res;



               }
               ).
               Do((sorted) => Console.WriteLine("Sorted max:" + sorted.max + ".  Priority queue length:" + sorted.list.Count)).
               ObserveOn(Scheduler.Default); //observe on other thread

            m.Subscribe((v)=> { Console.WriteLine("Observed: "+v.max); Thread.Sleep(3000); closingSelector.OnNext(true); }) ;
        }
    }
}