c# Rate Limiting foreach 迭代
c# Rate Limiting foreach iterations
基本上我试图能够对列表迭代的执行进行速率限制。
我真的很喜欢使用 RX 的想法,因为我可以在它之上构建,并有一个更优雅的解决方案,但不必使用 RX 来完成。
我在许多比我聪明得多的人的帮助下制定了这个。我的问题是我希望能够说 someCollection.RateLimitedForEach(rate, function),并让它最终阻塞直到我们已经完成处理...或者让它成为一个异步方法。
函数下方的演示在控制台应用程序中运行,但如果我在 foreach 之后关闭,它会立即 returns。
我有点不知所措这是否可以修复,或者我是否应该采取完全不同的方式
public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
.Do(action).Subscribe();
}
//rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second
//between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has
var maxRequestsPerMinute = 60;
requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) => SendRequest(request));
but it wouldn't have to be done using RX
以下是同步执行的方法:
public static void RateLimitedForEach<T>(
this List<T> list,
double minumumDelay,
Action<T> action)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
action(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if(left > 0)
Thread.Sleep(TimeSpan.FromSeconds(left));
}
}
下面是异步执行的方法(只有潜在的等待是异步的):
public static async Task RateLimitedForEachAsync<T>(
this List<T> list,
double minumumDelay,
Action<T> action)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
action(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if (left > 0)
await Task.Delay(TimeSpan.FromSeconds(left));
}
}
请注意,您可以更改异步版本以使操作像这样自异步:
public static async Task RateLimitedForEachAsync<T>(
this List<T> list,
double minumumDelay,
Func<T,Task> async_task_func)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
await async_task_func(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if (left > 0)
await Task.Delay(TimeSpan.FromSeconds(left));
}
}
如果您需要对每一项 运行 执行异步操作,这将很有帮助。
最后一个版本可以这样使用:
List<string> list = new List<string>();
list.Add("1");
list.Add("2");
var task = list.RateLimitedForEachAsync(1.0, async str =>
{
//Do something asynchronous here, e.g.:
await Task.Delay(500);
Console.WriteLine(DateTime.Now + ": " + str);
});
现在您应该等待 task
完成。如果是Main
方法,那么就需要这样同步等待:
task.Wait();
另一方面,如果您在异步方法中,则需要像这样异步等待:
await task;
RX Throttle 是不是你想要的?
https://msdn.microsoft.com/en-us/library/hh229400(v=vs.103).aspx
您需要了解的概念是,主线程不会等待您的 RateLimitedForEach
调用完成。此外 - 在您的控制台应用程序上 - 一旦主线程结束,进程就会结束。
这是什么意思?这意味着无论 RateLimitedForEach
上的观察者是否完成执行,进程都会结束。
注意:用户可能仍会强制执行您的应用程序以完成,这是一件好事。如果您希望在不挂起 UI 的情况下等待,您可以使用表单应用程序,如果您不希望用户关闭与进程相关的 windows,您可以使用服务。
使用 Task 是 我在下面介绍的内容。
请注意,在控制台应用程序上使用任务时,您仍然需要等待任务以防止主线程在 RateLimitedForEach
完成其工作之前完成。仍然建议远离控制台应用程序。
如果您坚持继续使用您的代码,您可以调整它以挂起调用线程直到完成:
public static void RateLimitedForEach<T>
(
this List<T> list,
double minumumDelay,
Action<T> action
)
{
using (var waitHandle = new ManualResetEventSlim(false))
{
var mainObservable = list.ToObservable();
var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay));
var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v);
zipObservable.Subscribe
(
action,
error => GC.KeepAlive(error), // Ingoring them, as you already were
() => waitHandle.Set() // <-- "Done signal"
);
waitHandle.Wait(); // <--- Waiting on the observer to complete
}
}
您的代码近乎完美。
试试这个:
public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
list
.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
.Do(action)
.ToArray()
.Wait();
}
基本上我试图能够对列表迭代的执行进行速率限制。
我真的很喜欢使用 RX 的想法,因为我可以在它之上构建,并有一个更优雅的解决方案,但不必使用 RX 来完成。
我在许多比我聪明得多的人的帮助下制定了这个。我的问题是我希望能够说 someCollection.RateLimitedForEach(rate, function),并让它最终阻塞直到我们已经完成处理...或者让它成为一个异步方法。
函数下方的演示在控制台应用程序中运行,但如果我在 foreach 之后关闭,它会立即 returns。
我有点不知所措这是否可以修复,或者我是否应该采取完全不同的方式
public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
.Do(action).Subscribe();
}
//rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second
//between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has
var maxRequestsPerMinute = 60;
requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) => SendRequest(request));
but it wouldn't have to be done using RX
以下是同步执行的方法:
public static void RateLimitedForEach<T>(
this List<T> list,
double minumumDelay,
Action<T> action)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
action(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if(left > 0)
Thread.Sleep(TimeSpan.FromSeconds(left));
}
}
下面是异步执行的方法(只有潜在的等待是异步的):
public static async Task RateLimitedForEachAsync<T>(
this List<T> list,
double minumumDelay,
Action<T> action)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
action(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if (left > 0)
await Task.Delay(TimeSpan.FromSeconds(left));
}
}
请注意,您可以更改异步版本以使操作像这样自异步:
public static async Task RateLimitedForEachAsync<T>(
this List<T> list,
double minumumDelay,
Func<T,Task> async_task_func)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
await async_task_func(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if (left > 0)
await Task.Delay(TimeSpan.FromSeconds(left));
}
}
如果您需要对每一项 运行 执行异步操作,这将很有帮助。
最后一个版本可以这样使用:
List<string> list = new List<string>();
list.Add("1");
list.Add("2");
var task = list.RateLimitedForEachAsync(1.0, async str =>
{
//Do something asynchronous here, e.g.:
await Task.Delay(500);
Console.WriteLine(DateTime.Now + ": " + str);
});
现在您应该等待 task
完成。如果是Main
方法,那么就需要这样同步等待:
task.Wait();
另一方面,如果您在异步方法中,则需要像这样异步等待:
await task;
RX Throttle 是不是你想要的?
https://msdn.microsoft.com/en-us/library/hh229400(v=vs.103).aspx
您需要了解的概念是,主线程不会等待您的 RateLimitedForEach
调用完成。此外 - 在您的控制台应用程序上 - 一旦主线程结束,进程就会结束。
这是什么意思?这意味着无论 RateLimitedForEach
上的观察者是否完成执行,进程都会结束。
注意:用户可能仍会强制执行您的应用程序以完成,这是一件好事。如果您希望在不挂起 UI 的情况下等待,您可以使用表单应用程序,如果您不希望用户关闭与进程相关的 windows,您可以使用服务。
使用 Task 是
请注意,在控制台应用程序上使用任务时,您仍然需要等待任务以防止主线程在 RateLimitedForEach
完成其工作之前完成。仍然建议远离控制台应用程序。
如果您坚持继续使用您的代码,您可以调整它以挂起调用线程直到完成:
public static void RateLimitedForEach<T>
(
this List<T> list,
double minumumDelay,
Action<T> action
)
{
using (var waitHandle = new ManualResetEventSlim(false))
{
var mainObservable = list.ToObservable();
var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay));
var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v);
zipObservable.Subscribe
(
action,
error => GC.KeepAlive(error), // Ingoring them, as you already were
() => waitHandle.Set() // <-- "Done signal"
);
waitHandle.Wait(); // <--- Waiting on the observer to complete
}
}
您的代码近乎完美。
试试这个:
public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
list
.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
.Do(action)
.ToArray()
.Wait();
}