c# Rate Limiting foreach 迭代

c# Rate Limiting foreach iterations

基本上我试图能够对列表迭代的执行进行速率限制。

我真的很喜欢使用 RX 的想法,因为我可以在它之上构建,并有一个更优雅的解决方案,但不必使用 RX 来完成。

我在许多比我聪明得多的人的帮助下制定了这个。我的问题是我希望能够说 someCollection.RateLimitedForEach(rate, function),并让它最终阻塞直到我们已经完成处理...或者让它成为一个异步方法。

函数下方的演示在控制台应用程序中运行,但如果我在 foreach 之后关闭,它会立即 returns。

我有点不知所措这是否可以修复,或者我是否应该采取完全不同的方式

public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
    list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
    .Do(action).Subscribe();
}

//rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second
//between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has
var maxRequestsPerMinute = 60;
requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) =>   SendRequest(request));

but it wouldn't have to be done using RX

以下是同步执行的方法:

public static void RateLimitedForEach<T>(
    this List<T> list,
    double minumumDelay,
    Action<T> action)
{
    foreach (var item in list)
    {
        Stopwatch sw = Stopwatch.StartNew();

        action(item);

        double left = minumumDelay - sw.Elapsed.TotalSeconds;

        if(left > 0)
            Thread.Sleep(TimeSpan.FromSeconds(left));
    }
}

下面是异步执行的方法(只有潜在的等待是异步的):

public static async Task RateLimitedForEachAsync<T>(
    this List<T> list,
    double minumumDelay,
    Action<T> action)
{
    foreach (var item in list)
    {
        Stopwatch sw = Stopwatch.StartNew();

        action(item);

        double left = minumumDelay - sw.Elapsed.TotalSeconds;

        if (left > 0)
            await Task.Delay(TimeSpan.FromSeconds(left));
    }
}    

请注意,您可以更改异步版本以使操作像这样自异步:

public static async Task RateLimitedForEachAsync<T>(
    this List<T> list,
    double minumumDelay,
    Func<T,Task> async_task_func)
{
    foreach (var item in list)
    {
        Stopwatch sw = Stopwatch.StartNew();

        await async_task_func(item);

        double left = minumumDelay - sw.Elapsed.TotalSeconds;

        if (left > 0)
            await Task.Delay(TimeSpan.FromSeconds(left));

    }
}    

如果您需要对每一项 运行 执行异步操作,这将很有帮助。

最后一个版本可以这样使用:

List<string> list = new List<string>();

list.Add("1");
list.Add("2");

var task = list.RateLimitedForEachAsync(1.0, async str =>
{
    //Do something asynchronous here, e.g.:
    await Task.Delay(500);
    Console.WriteLine(DateTime.Now + ": " + str);
});

现在您应该等待 task 完成。如果是Main方法,那么就需要这样同步等待:

task.Wait();

另一方面,如果您在异步方法中,则需要像这样异步等待:

await task;

RX Throttle 是不是你想要的?

https://msdn.microsoft.com/en-us/library/hh229400(v=vs.103).aspx

您需要了解的概念是,主线程不会等待您的 RateLimitedForEach 调用完成。此外 - 在您的控制台应用程序上 - 一旦主线程结束,进程就会结束。

这是什么意思?这意味着无论 RateLimitedForEach 上的观察者是否完成执行,进程都会结束。

注意:用户可能仍会强制执行您的应用程序以完成,这是一件好事。如果您希望在不挂起 UI 的情况下等待,您可以使用表单应用程序,如果您不希望用户关闭与进程相关的 windows,您可以使用服务。


使用 Task 是 我在下面介绍的内容。

请注意,在控制台应用程序上使用任务时,您仍然需要等待任务以防止主线程在 RateLimitedForEach 完成其工作之前完成。仍然建议远离控制台应用程序。


如果您坚持继续使用您的代码,您可以调整它以挂起调用线程直到完成:

public static void RateLimitedForEach<T>
(
    this List<T> list,
    double minumumDelay,
    Action<T> action
)
{
    using (var waitHandle = new ManualResetEventSlim(false))
    {

        var mainObservable = list.ToObservable();
        var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay));  
        var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v);
        zipObservable.Subscribe
        (
            action,
            error => GC.KeepAlive(error), // Ingoring them, as you already were
            () => waitHandle.Set() // <-- "Done signal"
        );

        waitHandle.Wait(); // <--- Waiting on the observer to complete
    }
}

您的代码近乎完美。

试试这个:

public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
    list
        .ToObservable()
        .Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
        .Do(action)
        .ToArray()
        .Wait();
}