具有流大小的缓冲区而不是流反应性扩展 C# 的数量
buffer with stream size instead of number of streams reactive extension C#
- 我有一个
Producer()
将数据推送到 blocking
集合。
- 在
Consumer()
中,我使用 System.Reactive
(4.1.2). 作为 Observable
订阅了 blocking
合集
- 我正在使用
Buffer
,但只能缓冲一定数量的流。
问题 - 我可以将 buffer
运算符用于流的大小而不是流的数量吗?
当缓冲区大小超过(例如 1024 KB 或 1 MB)时,创建新缓冲区?
class Program
{
private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();
var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
.Subscribe(ts =>
{
WriteToFile(ts.ToList());
});
}
private static void WriteToFile(List<Message> listToWrite)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(listToWrite));
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
可观察的扩展方法,
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
TimeSpan threshold, int noOfStream)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
很高兴看到正在使用的扩展方法:)
您可以稍微修改它以使其 Scan
运行 数量 Message
尺寸。通过这样做,我们失去了类型泛型。
public class Message
{
public string Payload { get; set; }
public int Size { get; set; }
}
public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
TimeSpan threshold, int maxSize)
{
return Observable.Create<IList<Message>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Select( i => i.Size)
.Scan(0, (a, b) => a + b)
.Where(a => a >= maxSize)
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
- 我有一个
Producer()
将数据推送到blocking
集合。 - 在
Consumer()
中,我使用System.Reactive
(4.1.2). 作为 - 我正在使用
Buffer
,但只能缓冲一定数量的流。
Observable
订阅了 blocking
合集
问题 - 我可以将 buffer
运算符用于流的大小而不是流的数量吗?
当缓冲区大小超过(例如 1024 KB 或 1 MB)时,创建新缓冲区?
class Program
{
private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();
var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
.Subscribe(ts =>
{
WriteToFile(ts.ToList());
});
}
private static void WriteToFile(List<Message> listToWrite)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(listToWrite));
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
可观察的扩展方法,
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
TimeSpan threshold, int noOfStream)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
很高兴看到正在使用的扩展方法:)
您可以稍微修改它以使其 Scan
运行 数量 Message
尺寸。通过这样做,我们失去了类型泛型。
public class Message
{
public string Payload { get; set; }
public int Size { get; set; }
}
public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
TimeSpan threshold, int maxSize)
{
return Observable.Create<IList<Message>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Select( i => i.Size)
.Scan(0, (a, b) => a + b)
.Where(a => a >= maxSize)
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}