用于限制每秒请求数的信号量不起作用
Semaphore for limiting requests per second doesn't work
我正在使用 Google Analytics,该服务有 10 个并发请求的限制。我不得不以某种方式限制我的 API,所以我决定使用信号量,但它似乎不起作用。所有请求同时触发。我在我的代码中找不到问题。
public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
var topPages = _googleAnalyticsService.GetTodaysTopPages();
var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
var usersByCountry = _googleAnalyticsService.GetUsersByCountry();
var tasks = new List<Task>()
{
todayVisits, todayTraffic, newAndReturningUsers,
averageSessionDuration, deviceCategory, topPages,
guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
visitsByHours, usersByPrefectures, usersByCountry
};
var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
foreach(var task in tasks)
{
await throttler.WaitAsync();
try
{
await task;
await Task.Delay(1000); // It's important due to limits of Google Analytics requests (10 queries per second per IP address)
}
finally
{
throttler.Release();
}
}
await Task.WhenAll(tasks);
return new SiteAnalyticsDTO()
{
TodayVisits = await todayVisits,
TodayTraffic = await todayTraffic,
NewAndReturningUsers = await newAndReturningUsers,
AverageSessionDuration = await averageSessionDuration,
DeviceCategory = await deviceCategory,
TopPages = await topPages,
GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
VisitsPerWeekday = await visitsPerWeekday,
VisitsByHours = await visitsByHours,
UsersByPrefectures = await usersByPrefectures,
UsersByCountry = await usersByCountry
};
}
下面是 Google 分析调用的一些示例方法:
public async Task<int> GetTodayVisitsNumber(List<long> listingIds = null)
{
string filter = GetFilter(listingIds);
var getReportsRequest = GetReportsRequestModel(GetTodayDateRange(), "ga:sessionCount", "ga:sessions", _configuration.MainViewId, filter);
var response = await _service.Reports.BatchGet(getReportsRequest).ExecuteAsync();
Console.WriteLine(response);
var data = response.Reports.FirstOrDefault();
return Convert.ToInt32(data?.Data.Totals[0].Values[0]);
}
All requests are triggered simultaneously.
来看看这里
var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
var topPages = _googleAnalyticsService.GetTodaysTopPages();
var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
var usersByCountry = _googleAnalyticsService.GetUsersByCountry();
您正在存储每个方法的结果。当您使用括号标记(例如“methodName();
”时,您调用了该方法并将结果存储在 var
.
中
然后将这些方法的结果存储在一个列表中,然后 await
每个方法都带有 Semaphore
以限制一次可以等待的任务数。
问题是:每个 await
立即完成,因为您在上面最初调用它们时已经 等待 (同步)它们。
这让你相信 SemaphoreSlim
没有工作,因为如果每个 Task
在等待时立即 returns(因为它们已经被调用)那么那里他们之间没有时间。
存储 async
方法以备后用,而不是一次调用它们。
您不能像 var
那样存储委托,您必须将它们存储在显式类型的变量 Func<TResult>
.
中
例如:
Func<Task<object>> todayVisits = _googleAnalyticsService.GetTodayVisitsNumber;
编者注意,我不知道这些方法是什么return我用对象替换得尽可能通用
现在 - 如果我们将每个都存储在一个变量中会非常麻烦,所以我们不要将它们存储在单独的变量中,而是直接将它们放在一个列表中,如下所示:
var awaitableTasks = new List<Func<Task<object>>>()
{
_googleAnalyticsService.GetTodayVisitsNumber,
_googleAnalyticsService.GetTodayTraffic,
_googleAnalyticsService.GetNewAndReturningUsersNumber,
_googleAnalyticsService.GetAverageSessionDuration,
_googleAnalyticsService.GetSessionNumberByDeviceCategory,
_googleAnalyticsService.GetTodaysTopPages,
_googleAnalyticsService.GetGuestsVsRegisteredUsers,
_googleAnalyticsService.GetAverageSessionsNumber,
_googleAnalyticsService.GetTrafficByWeekday,
_googleAnalyticsService.GetTrafficByTimeOfDay,
_googleAnalyticsService.GetUsersByPrefectures,
_googleAnalyticsService.GetUsersByCountry
};
因为这些新对象本身不是任务,而是 return 和 Task
的方法,我们必须改变我们存储和调用它们的方式,为此我们将使用本地方法,所以我将回顾我所做的每一项更改。
让我们创建那个 Semaphore
并创建一个我们可以放置任务以跟踪它们的地方。
让我们也创建一个地方,我们可以在 await
时存储每个任务的结果。
var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
var tasks = new List<Task>();
ConcurrentDictionary<string, object> results = new();
让我们创建一个具有几个职责的本地方法
- 接受一个
Func<Task<object>>
作为参数
Await
方法
- 把那个方法的结果放在我们以后可以得到的地方
- 释放
Semphore
即使遇到错误
async Task Worker(Func<Task<object>> awaitableFunc)
{
try
{
resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
}
finally
{
throttler.Release();
}
}
编者注:您可以使用 lambda 表达式完成同样的事情,但为了清晰和格式化,我更喜欢使用本地方法。
启动工作器并存储它们 return 的任务。
这样..如果在创建最后一对时它们还没有完成,那么我们可以在创建最终对象之前等待它们完成(因为我们将需要它们提供的所有结果创建最终对象)。
foreach (var task in awaitableTasks)
{
await throttler.WaitAsync();
tasks.Add(Task.Run(() => Worker(task)));
}
// wait for the tasks to finish
await Task.WhenAll(tasks);
创建最终对象,然后 return 它。
return new SiteAnalyticsDTO()
{
TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
};
将所有内容放在一起,我认为我们有一些可能有用的东西,或者至少可以很容易地修改以满足您的需求。
public static async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
// store these methods so we can iterate and execute them later
var awaitableTasks = new List<Func<Task<object>>>()
{
_googleAnalyticsService.GetTodayVisitsNumber,
_googleAnalyticsService.GetTodayTraffic,
_googleAnalyticsService.GetNewAndReturningUsersNumber,
_googleAnalyticsService.GetAverageSessionDuration,
_googleAnalyticsService.GetSessionNumberByDeviceCategory,
_googleAnalyticsService.GetTodaysTopPages,
_googleAnalyticsService.GetGuestsVsRegisteredUsers,
_googleAnalyticsService.GetAverageSessionsNumber,
_googleAnalyticsService.GetTrafficByWeekday,
_googleAnalyticsService.GetTrafficByTimeOfDay,
_googleAnalyticsService.GetUsersByPrefectures,
_googleAnalyticsService.GetUsersByCountry
};
// create a way to limit the number of concurrent requests
var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
// create a place to store the tasks we create
var finalTasks = new List<Task>();
// make sure we have some where to put our results
ConcurrentDictionary<string, object> resultDict = new();
// make a worker that accepts one of those methods, invokes it
// then adds the result to the dict
async Task Worker(Func<Task<object>> awaitableFunc)
{
try
{
resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
}
finally
{
// make sure even if we encounter an error we still release the semphore
throttler.Release();
}
}
// iterate over the tasks, wait for the sempahore
// when we get a slot, create a worker and send it to the background
foreach (var task in awaitableTasks)
{
await throttler.WaitAsync();
finalTasks.Add(Task.Run(() => Worker(task)));
}
// wait for any remaining tasks to finish up in the background if they are still running
await Task.WhenAll(finalTasks);
// create the return object from the results of the dictionary
return new SiteAnalyticsDTO()
{
TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
};
}
您设置的问题是所有任务同时启动,只有它们的等待受到限制。限制等待没有任何用处。只有 你的 延续被延迟。目标服务批量接收所有请求。
我的建议是使用专用的class来封装节流逻辑。看来您需要限制发送请求的并发性和速率,并且这些限制中的每一个都可以通过使用单独的 SemaphoreSlim
来实现。这是一个简单的实现:
public class ThrottledExecution
{
private readonly SemaphoreSlim _concurrencySemaphore;
private readonly SemaphoreSlim _delaySemaphore;
private readonly TimeSpan _delay;
public ThrottledExecution(int concurrencyLimit, TimeSpan rateLimitTime,
int rateLimitCount)
{
// Arguments validation omitted
_concurrencySemaphore = new SemaphoreSlim(concurrencyLimit, concurrencyLimit);
_delaySemaphore = new SemaphoreSlim(rateLimitCount, rateLimitCount);
_delay = rateLimitTime;
}
public async Task<TResult> Run<TResult>(Func<Task<TResult>> action)
{
await _delaySemaphore.WaitAsync();
ScheduleDelaySemaphoreRelease();
await _concurrencySemaphore.WaitAsync();
try { return await action().ConfigureAwait(false); }
finally { _concurrencySemaphore.Release(); }
}
private async void ScheduleDelaySemaphoreRelease()
{
await Task.Delay(_delay).ConfigureAwait(false);
_delaySemaphore.Release();
}
}
以下是您的使用方法:
public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
var throttler = new ThrottledExecution(MaxRequests, TimeSpan.FromSeconds(1), 1);
var todayVisits = throttler.Run(() => _service.GetTodayVisitsNumber());
var todayTraffic = throttler.Run(() => _service.GetTodayTraffic());
var newAndReturningUsers = throttler.Run(() => _service.GetNewAndReturningUsersNumber());
var averageSessionDuration = throttler.Run(() => _service.GetAverageSessionDuration());
var deviceCategory = throttler.Run(() => _service.GetSessionNumberByDeviceCategory());
var topPages = throttler.Run(() => _service.GetTodaysTopPages());
var guestsAndRegisteredUsers = throttler.Run(() => _service.GetGuestsVsRegisteredUsers());
var averageNumberOfSessionsPerDay = throttler.Run(() => _service.GetAverageSessionsNumber());
var visitsPerWeekday = throttler.Run(() => _service.GetTrafficByWeekday());
var visitsByHours = throttler.Run(() => _service.GetTrafficByTimeOfDay());
var usersByPrefectures = throttler.Run(() => _service.GetUsersByPrefectures());
var usersByCountry = throttler.Run(() => _service.GetUsersByCountry());
var tasks = new List<Task>()
{
todayVisits, todayTraffic, newAndReturningUsers,
averageSessionDuration, deviceCategory, topPages,
guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
visitsByHours, usersByPrefectures, usersByCountry
};
await Task.WhenAll(tasks);
return new SiteAnalyticsDTO()
{
TodayVisits = await todayVisits,
TodayTraffic = await todayTraffic,
NewAndReturningUsers = await newAndReturningUsers,
AverageSessionDuration = await averageSessionDuration,
DeviceCategory = await deviceCategory,
TopPages = await topPages,
GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
VisitsPerWeekday = await visitsPerWeekday,
VisitsByHours = await visitsByHours,
UsersByPrefectures = await usersByPrefectures,
UsersByCountry = await usersByCountry,
};
}
看来部分成功的结果对你没有用,所以你可以考虑在ThrottledExecution
class里面加入一些自动取消的逻辑。如果任务失败,则应取消所有挂起的和后续的异步操作。
我正在使用 Google Analytics,该服务有 10 个并发请求的限制。我不得不以某种方式限制我的 API,所以我决定使用信号量,但它似乎不起作用。所有请求同时触发。我在我的代码中找不到问题。
public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
var topPages = _googleAnalyticsService.GetTodaysTopPages();
var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
var usersByCountry = _googleAnalyticsService.GetUsersByCountry();
var tasks = new List<Task>()
{
todayVisits, todayTraffic, newAndReturningUsers,
averageSessionDuration, deviceCategory, topPages,
guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
visitsByHours, usersByPrefectures, usersByCountry
};
var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
foreach(var task in tasks)
{
await throttler.WaitAsync();
try
{
await task;
await Task.Delay(1000); // It's important due to limits of Google Analytics requests (10 queries per second per IP address)
}
finally
{
throttler.Release();
}
}
await Task.WhenAll(tasks);
return new SiteAnalyticsDTO()
{
TodayVisits = await todayVisits,
TodayTraffic = await todayTraffic,
NewAndReturningUsers = await newAndReturningUsers,
AverageSessionDuration = await averageSessionDuration,
DeviceCategory = await deviceCategory,
TopPages = await topPages,
GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
VisitsPerWeekday = await visitsPerWeekday,
VisitsByHours = await visitsByHours,
UsersByPrefectures = await usersByPrefectures,
UsersByCountry = await usersByCountry
};
}
下面是 Google 分析调用的一些示例方法:
public async Task<int> GetTodayVisitsNumber(List<long> listingIds = null)
{
string filter = GetFilter(listingIds);
var getReportsRequest = GetReportsRequestModel(GetTodayDateRange(), "ga:sessionCount", "ga:sessions", _configuration.MainViewId, filter);
var response = await _service.Reports.BatchGet(getReportsRequest).ExecuteAsync();
Console.WriteLine(response);
var data = response.Reports.FirstOrDefault();
return Convert.ToInt32(data?.Data.Totals[0].Values[0]);
}
All requests are triggered simultaneously.
来看看这里
var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber(); var todayTraffic = _googleAnalyticsService.GetTodayTraffic(); var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber(); var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration(); var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory(); var topPages = _googleAnalyticsService.GetTodaysTopPages(); var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers(); var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber(); var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday(); var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay(); var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures(); var usersByCountry = _googleAnalyticsService.GetUsersByCountry();
您正在存储每个方法的结果。当您使用括号标记(例如“methodName();
”时,您调用了该方法并将结果存储在 var
.
然后将这些方法的结果存储在一个列表中,然后 await
每个方法都带有 Semaphore
以限制一次可以等待的任务数。
问题是:每个 await
立即完成,因为您在上面最初调用它们时已经 等待 (同步)它们。
这让你相信 SemaphoreSlim
没有工作,因为如果每个 Task
在等待时立即 returns(因为它们已经被调用)那么那里他们之间没有时间。
存储 async
方法以备后用,而不是一次调用它们。
您不能像 var
那样存储委托,您必须将它们存储在显式类型的变量 Func<TResult>
.
例如:
Func<Task<object>> todayVisits = _googleAnalyticsService.GetTodayVisitsNumber;
编者注意,我不知道这些方法是什么return我用对象替换得尽可能通用
现在 - 如果我们将每个都存储在一个变量中会非常麻烦,所以我们不要将它们存储在单独的变量中,而是直接将它们放在一个列表中,如下所示:
var awaitableTasks = new List<Func<Task<object>>>()
{
_googleAnalyticsService.GetTodayVisitsNumber,
_googleAnalyticsService.GetTodayTraffic,
_googleAnalyticsService.GetNewAndReturningUsersNumber,
_googleAnalyticsService.GetAverageSessionDuration,
_googleAnalyticsService.GetSessionNumberByDeviceCategory,
_googleAnalyticsService.GetTodaysTopPages,
_googleAnalyticsService.GetGuestsVsRegisteredUsers,
_googleAnalyticsService.GetAverageSessionsNumber,
_googleAnalyticsService.GetTrafficByWeekday,
_googleAnalyticsService.GetTrafficByTimeOfDay,
_googleAnalyticsService.GetUsersByPrefectures,
_googleAnalyticsService.GetUsersByCountry
};
因为这些新对象本身不是任务,而是 return 和 Task
的方法,我们必须改变我们存储和调用它们的方式,为此我们将使用本地方法,所以我将回顾我所做的每一项更改。
让我们创建那个 Semaphore
并创建一个我们可以放置任务以跟踪它们的地方。
让我们也创建一个地方,我们可以在 await
时存储每个任务的结果。
var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
var tasks = new List<Task>();
ConcurrentDictionary<string, object> results = new();
让我们创建一个具有几个职责的本地方法
- 接受一个
Func<Task<object>>
作为参数 Await
方法- 把那个方法的结果放在我们以后可以得到的地方
- 释放
Semphore
即使遇到错误
async Task Worker(Func<Task<object>> awaitableFunc)
{
try
{
resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
}
finally
{
throttler.Release();
}
}
编者注:您可以使用 lambda 表达式完成同样的事情,但为了清晰和格式化,我更喜欢使用本地方法。
启动工作器并存储它们 return 的任务。
这样..如果在创建最后一对时它们还没有完成,那么我们可以在创建最终对象之前等待它们完成(因为我们将需要它们提供的所有结果创建最终对象)。
foreach (var task in awaitableTasks)
{
await throttler.WaitAsync();
tasks.Add(Task.Run(() => Worker(task)));
}
// wait for the tasks to finish
await Task.WhenAll(tasks);
创建最终对象,然后 return 它。
return new SiteAnalyticsDTO()
{
TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
};
将所有内容放在一起,我认为我们有一些可能有用的东西,或者至少可以很容易地修改以满足您的需求。
public static async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
// store these methods so we can iterate and execute them later
var awaitableTasks = new List<Func<Task<object>>>()
{
_googleAnalyticsService.GetTodayVisitsNumber,
_googleAnalyticsService.GetTodayTraffic,
_googleAnalyticsService.GetNewAndReturningUsersNumber,
_googleAnalyticsService.GetAverageSessionDuration,
_googleAnalyticsService.GetSessionNumberByDeviceCategory,
_googleAnalyticsService.GetTodaysTopPages,
_googleAnalyticsService.GetGuestsVsRegisteredUsers,
_googleAnalyticsService.GetAverageSessionsNumber,
_googleAnalyticsService.GetTrafficByWeekday,
_googleAnalyticsService.GetTrafficByTimeOfDay,
_googleAnalyticsService.GetUsersByPrefectures,
_googleAnalyticsService.GetUsersByCountry
};
// create a way to limit the number of concurrent requests
var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
// create a place to store the tasks we create
var finalTasks = new List<Task>();
// make sure we have some where to put our results
ConcurrentDictionary<string, object> resultDict = new();
// make a worker that accepts one of those methods, invokes it
// then adds the result to the dict
async Task Worker(Func<Task<object>> awaitableFunc)
{
try
{
resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
}
finally
{
// make sure even if we encounter an error we still release the semphore
throttler.Release();
}
}
// iterate over the tasks, wait for the sempahore
// when we get a slot, create a worker and send it to the background
foreach (var task in awaitableTasks)
{
await throttler.WaitAsync();
finalTasks.Add(Task.Run(() => Worker(task)));
}
// wait for any remaining tasks to finish up in the background if they are still running
await Task.WhenAll(finalTasks);
// create the return object from the results of the dictionary
return new SiteAnalyticsDTO()
{
TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
};
}
您设置的问题是所有任务同时启动,只有它们的等待受到限制。限制等待没有任何用处。只有 你的 延续被延迟。目标服务批量接收所有请求。
我的建议是使用专用的class来封装节流逻辑。看来您需要限制发送请求的并发性和速率,并且这些限制中的每一个都可以通过使用单独的 SemaphoreSlim
来实现。这是一个简单的实现:
public class ThrottledExecution
{
private readonly SemaphoreSlim _concurrencySemaphore;
private readonly SemaphoreSlim _delaySemaphore;
private readonly TimeSpan _delay;
public ThrottledExecution(int concurrencyLimit, TimeSpan rateLimitTime,
int rateLimitCount)
{
// Arguments validation omitted
_concurrencySemaphore = new SemaphoreSlim(concurrencyLimit, concurrencyLimit);
_delaySemaphore = new SemaphoreSlim(rateLimitCount, rateLimitCount);
_delay = rateLimitTime;
}
public async Task<TResult> Run<TResult>(Func<Task<TResult>> action)
{
await _delaySemaphore.WaitAsync();
ScheduleDelaySemaphoreRelease();
await _concurrencySemaphore.WaitAsync();
try { return await action().ConfigureAwait(false); }
finally { _concurrencySemaphore.Release(); }
}
private async void ScheduleDelaySemaphoreRelease()
{
await Task.Delay(_delay).ConfigureAwait(false);
_delaySemaphore.Release();
}
}
以下是您的使用方法:
public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
var throttler = new ThrottledExecution(MaxRequests, TimeSpan.FromSeconds(1), 1);
var todayVisits = throttler.Run(() => _service.GetTodayVisitsNumber());
var todayTraffic = throttler.Run(() => _service.GetTodayTraffic());
var newAndReturningUsers = throttler.Run(() => _service.GetNewAndReturningUsersNumber());
var averageSessionDuration = throttler.Run(() => _service.GetAverageSessionDuration());
var deviceCategory = throttler.Run(() => _service.GetSessionNumberByDeviceCategory());
var topPages = throttler.Run(() => _service.GetTodaysTopPages());
var guestsAndRegisteredUsers = throttler.Run(() => _service.GetGuestsVsRegisteredUsers());
var averageNumberOfSessionsPerDay = throttler.Run(() => _service.GetAverageSessionsNumber());
var visitsPerWeekday = throttler.Run(() => _service.GetTrafficByWeekday());
var visitsByHours = throttler.Run(() => _service.GetTrafficByTimeOfDay());
var usersByPrefectures = throttler.Run(() => _service.GetUsersByPrefectures());
var usersByCountry = throttler.Run(() => _service.GetUsersByCountry());
var tasks = new List<Task>()
{
todayVisits, todayTraffic, newAndReturningUsers,
averageSessionDuration, deviceCategory, topPages,
guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
visitsByHours, usersByPrefectures, usersByCountry
};
await Task.WhenAll(tasks);
return new SiteAnalyticsDTO()
{
TodayVisits = await todayVisits,
TodayTraffic = await todayTraffic,
NewAndReturningUsers = await newAndReturningUsers,
AverageSessionDuration = await averageSessionDuration,
DeviceCategory = await deviceCategory,
TopPages = await topPages,
GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
VisitsPerWeekday = await visitsPerWeekday,
VisitsByHours = await visitsByHours,
UsersByPrefectures = await usersByPrefectures,
UsersByCountry = await usersByCountry,
};
}
看来部分成功的结果对你没有用,所以你可以考虑在ThrottledExecution
class里面加入一些自动取消的逻辑。如果任务失败,则应取消所有挂起的和后续的异步操作。