如何按 ID Rx 分组和限制对象
How to group and Throttle Object by ID Rx
我有相同类型的传入对象,但是如果一个对象 属性 IsThrottlable
设置为 false 而不管 ID 我不想限制它但是如果 IsThrottlable
设置为 true 我想通过 ID 每 3 秒限制一次对象。因此,如果具有相同 ID 的对象在 3 秒内出现 50 次,我想为最后一个对象发送 HTTPSend。
namespace BoatThrottle
{
class MData
{
public int ID { get; set; }
public bool IsThrottlable { get; set; }
public string Description { get; set; }
}
class Program
{
static void Main(string[] args)
{
Random rand = new Random();
while (true)
{
var data = GenerateRandomObj(rand);
SendData(data);
Task.Delay(rand.Next(100, 2000));
}
}
static MData GenerateRandomObj(Random rand)
{
return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
}
static void SendData(MData mData)
{
if (mData.IsThrottlable)
{
_doValues.OnNext(mData);
var dd = ThrottledById(DoValues);
var observable =
dd
.Throttle(TimeSpan.FromMilliseconds(3000.0))
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations());
_subscription =
observable
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
.Subscribe(y =>
{
HTTPSend(y);
});
}
else
{
// MData object coming in IsThrottlable set to false always send this data NO throttling
HTTPSend(mData);
}
}
private static IDisposable? _subscription = null;
public static IObservable<MData> ThrottledById(IObservable<MData> observable)
{
return observable.Buffer(TimeSpan.FromSeconds(3))
.SelectMany(x =>
x.GroupBy(y => y.ID)
.Select(y => y.Last()));
}
private static readonly Subject<MData> _doValues = new Subject<MData>();
public static IObservable<MData> DoValues { get { return _doValues; } }
static void HTTPSend(MData mData)
{
Console.WriteLine("===============HTTP===>> " + mData.ID + " " + mData.Description + " " + mData.IsThrottlable);
}
}
}
编辑:
例如 3 秒内全部收到
MData ID = 1,IsThrottlable = False,说明 =“通知”
MData ID = 2,IsThrottlable = True,说明 =“Notify1”
MData ID = 2,IsThrottlable = True,说明 =“Notify2”
MData ID = 9,IsThrottlable = False,说明 =“Notify2”
MData ID = 2,IsThrottlable = True,说明 =“Notify3”
MData ID = 2,IsThrottlable = True,说明 =“Notify4”
MData ID = 3,IsThrottlable = True,说明 =“通知”
MData ID = 4,IsThrottlable = True,说明 =“通知”
MData ID = 5,IsThrottlable = True,说明 =“Notify1”
MData ID = 5,IsThrottlable = True,说明 =“Notify2”
MData ID = 8,IsThrottlable = True,说明 =“Notify1”
MData ID = 8,IsThrottlable = True,说明 =“Notify2”
MData ID = 8,IsThrottlable = True,说明 =“Notify3”
MData ID = 8,IsThrottlable = True,说明 =“Notify4”
MData ID = 8,IsThrottlable = True,说明 =“Notify5”
MData ID = 8,IsThrottlable = True,说明 =“Notify6”
预计在前 3 秒:
- MData ID = 1,IsThrottlable = False,说明 =“通知”
- MData ID = 9,IsThrottlable = False,说明 =“Notify2”
- MData ID = 2,IsThrottlable = True,说明 =“Notify4”
- MData ID = 3,IsThrottlable = True,说明 =“通知”
- MData ID = 4,IsThrottlable = True,说明 =“通知”
- MData ID = 5,IsThrottlable = True,说明 =“Notify2”
- MData ID = 8,IsThrottlable = True,说明 =“Notify6”
一种方法是按 IsThrottlable
属性 对序列进行分组。这样您将获得一个包含两个子序列的嵌套序列,一个包含可调节元素,一个包含 non-throttleable 元素。然后,您可以相应地转换两个子序列中的每一个,最后使用 SelectMany
运算符将嵌套序列展平回包含两个转换子序列发出的元素的平面序列。
包含 non-throttleable 个元素的子序列不需要转换,因此您可以 return 原样。
包含可节流元素的子序列需要通过 ID
属性 进一步分组,生成更薄的子序列,这些子序列仅包含具有相同 ID 的可节流元素。这些是需要限制的序列:
IObservable<MData> throttled = source
.GroupBy(x => x.IsThrottlable)
.SelectMany(g1 =>
{
if (!g1.Key) return g1; // Not throttleable, return it as is.
return g1
.GroupBy(x => x.ID)
.SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
});
最后你会得到一个包含可节流项目和 non-throttleable 项目的平面序列,可节流项目已经被 id 节流了。
SelectMany
运算符本质上是 Select
+Merge
运算符的组合。
我决定采用你在问题中发布的最终实现,但它应该作为答案,并以最惯用的 Rx 方式为你清理查询。
这是我的代码版本:
public MainWindow()
{
InitializeComponent();
Debug.Print("========================");
_subscription =
Observable
.Generate(0, x => true, x => x + 1,
x => new MData() { ID = Random.Shared.Next(1, 3), Description = "Notification....", IsThrottlable = Random.Shared.Next(2) == 1 },
x => TimeSpan.FromMilliseconds(Random.Shared.Next(100, 2000)))
.GroupBy(m => m.IsThrottlable)
.SelectMany(g =>
g.Key
? g.GroupBy(x => x.ID).SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3.0)))
: g)
.SelectMany(m => Observable.Start(() => HTTPSend(m)))
.Subscribe();
}
最后的.SelectMany(m => Observable.Start(() => HTTPSend(m)))
可能需要写成.Select(m => Observable.Start(() => HTTPSend(m))).Merge(1)
。
我有相同类型的传入对象,但是如果一个对象 属性 IsThrottlable
设置为 false 而不管 ID 我不想限制它但是如果 IsThrottlable
设置为 true 我想通过 ID 每 3 秒限制一次对象。因此,如果具有相同 ID 的对象在 3 秒内出现 50 次,我想为最后一个对象发送 HTTPSend。
namespace BoatThrottle
{
class MData
{
public int ID { get; set; }
public bool IsThrottlable { get; set; }
public string Description { get; set; }
}
class Program
{
static void Main(string[] args)
{
Random rand = new Random();
while (true)
{
var data = GenerateRandomObj(rand);
SendData(data);
Task.Delay(rand.Next(100, 2000));
}
}
static MData GenerateRandomObj(Random rand)
{
return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
}
static void SendData(MData mData)
{
if (mData.IsThrottlable)
{
_doValues.OnNext(mData);
var dd = ThrottledById(DoValues);
var observable =
dd
.Throttle(TimeSpan.FromMilliseconds(3000.0))
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations());
_subscription =
observable
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
.Subscribe(y =>
{
HTTPSend(y);
});
}
else
{
// MData object coming in IsThrottlable set to false always send this data NO throttling
HTTPSend(mData);
}
}
private static IDisposable? _subscription = null;
public static IObservable<MData> ThrottledById(IObservable<MData> observable)
{
return observable.Buffer(TimeSpan.FromSeconds(3))
.SelectMany(x =>
x.GroupBy(y => y.ID)
.Select(y => y.Last()));
}
private static readonly Subject<MData> _doValues = new Subject<MData>();
public static IObservable<MData> DoValues { get { return _doValues; } }
static void HTTPSend(MData mData)
{
Console.WriteLine("===============HTTP===>> " + mData.ID + " " + mData.Description + " " + mData.IsThrottlable);
}
}
}
编辑:
例如 3 秒内全部收到
MData ID = 1,IsThrottlable = False,说明 =“通知”
MData ID = 2,IsThrottlable = True,说明 =“Notify1”
MData ID = 2,IsThrottlable = True,说明 =“Notify2”
MData ID = 9,IsThrottlable = False,说明 =“Notify2”
MData ID = 2,IsThrottlable = True,说明 =“Notify3”
MData ID = 2,IsThrottlable = True,说明 =“Notify4”
MData ID = 3,IsThrottlable = True,说明 =“通知”
MData ID = 4,IsThrottlable = True,说明 =“通知”
MData ID = 5,IsThrottlable = True,说明 =“Notify1”
MData ID = 5,IsThrottlable = True,说明 =“Notify2”
MData ID = 8,IsThrottlable = True,说明 =“Notify1”
MData ID = 8,IsThrottlable = True,说明 =“Notify2”
MData ID = 8,IsThrottlable = True,说明 =“Notify3”
MData ID = 8,IsThrottlable = True,说明 =“Notify4”
MData ID = 8,IsThrottlable = True,说明 =“Notify5”
MData ID = 8,IsThrottlable = True,说明 =“Notify6”
预计在前 3 秒:
- MData ID = 1,IsThrottlable = False,说明 =“通知”
- MData ID = 9,IsThrottlable = False,说明 =“Notify2”
- MData ID = 2,IsThrottlable = True,说明 =“Notify4”
- MData ID = 3,IsThrottlable = True,说明 =“通知”
- MData ID = 4,IsThrottlable = True,说明 =“通知”
- MData ID = 5,IsThrottlable = True,说明 =“Notify2”
- MData ID = 8,IsThrottlable = True,说明 =“Notify6”
一种方法是按 IsThrottlable
属性 对序列进行分组。这样您将获得一个包含两个子序列的嵌套序列,一个包含可调节元素,一个包含 non-throttleable 元素。然后,您可以相应地转换两个子序列中的每一个,最后使用 SelectMany
运算符将嵌套序列展平回包含两个转换子序列发出的元素的平面序列。
包含 non-throttleable 个元素的子序列不需要转换,因此您可以 return 原样。
包含可节流元素的子序列需要通过 ID
属性 进一步分组,生成更薄的子序列,这些子序列仅包含具有相同 ID 的可节流元素。这些是需要限制的序列:
IObservable<MData> throttled = source
.GroupBy(x => x.IsThrottlable)
.SelectMany(g1 =>
{
if (!g1.Key) return g1; // Not throttleable, return it as is.
return g1
.GroupBy(x => x.ID)
.SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
});
最后你会得到一个包含可节流项目和 non-throttleable 项目的平面序列,可节流项目已经被 id 节流了。
SelectMany
运算符本质上是 Select
+Merge
运算符的组合。
我决定采用你在问题中发布的最终实现,但它应该作为答案,并以最惯用的 Rx 方式为你清理查询。
这是我的代码版本:
public MainWindow()
{
InitializeComponent();
Debug.Print("========================");
_subscription =
Observable
.Generate(0, x => true, x => x + 1,
x => new MData() { ID = Random.Shared.Next(1, 3), Description = "Notification....", IsThrottlable = Random.Shared.Next(2) == 1 },
x => TimeSpan.FromMilliseconds(Random.Shared.Next(100, 2000)))
.GroupBy(m => m.IsThrottlable)
.SelectMany(g =>
g.Key
? g.GroupBy(x => x.ID).SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3.0)))
: g)
.SelectMany(m => Observable.Start(() => HTTPSend(m)))
.Subscribe();
}
最后的.SelectMany(m => Observable.Start(() => HTTPSend(m)))
可能需要写成.Select(m => Observable.Start(() => HTTPSend(m))).Merge(1)
。