具有流大小的缓冲区而不是流反应性扩展 C# 的数量

buffer with stream size instead of number of streams reactive extension C#

  1. 我有一个 Producer() 将数据推送到 blocking 集合。
  2. Consumer() 中,我使用 System.Reactive (4.1.2).
  3. 作为 Observable 订阅了 blocking 合集
  4. 我正在使用 Buffer,但只能缓冲一定数量的流。

问题 - 我可以将 buffer 运算符用于流的大小而不是流的数量吗?

当缓冲区大小超过(例如 1024 KB 或 1 MB)时,创建新缓冲区?

    class Program
    {
        private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();

    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();

        var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                    .Subscribe(ts =>
                                    {
                                        WriteToFile(ts.ToList());
                                    });
    }

    private static void WriteToFile(List<Message> listToWrite)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(listToWrite));
        }
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
     }
    }

可观察的扩展方法,

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                            TimeSpan threshold, int noOfStream)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }

很高兴看到正在使用的扩展方法:)

您可以稍微修改它以使其 Scan 运行 数量 Message 尺寸。通过这样做,我们失去了类型泛型。

public class Message
{
    public string Payload { get; set; }
    public int Size { get; set; }
}

public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
                                                     TimeSpan threshold, int maxSize)
{
    return Observable.Create<IList<Message>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Select( i => i.Size)
                                                 .Scan(0, (a, b) => a + b)
                                                 .Where(a => a >= maxSize)
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}