在 C# 中执行并发任务
Performing concurrent tasks in C#
我有一项服务需要尽快从 Amazon SQS 读取消息。我们预计流量很大,我希望能够读取超过 10K messages/second。不幸的是,我目前大约 10 messages/second。显然,我有工作要做。
这就是我正在使用的(已转换为控制台应用程序以使测试更容易):
private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
_concurrentRequests = 0;
_maxConcurrentRequests = 100;
var timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
timer.Interval = 10;
timer.Enabled = true;
Console.ReadLine();
timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
if (_concurrentRequests < _maxConcurrentRequests) {
_concurrentRequests++;
ProcessMessages();
}
}
public static async Task ProcessMessages() {
var manager = new MessageManager();
manager.ProcessMessages(); // this is an async method that reads in the messages from SQS
_concurrentRequests--;
}
我没有接近 100 个并发请求,而且它似乎没有每 10 毫秒触发 OnTimedEvent
。
我不确定 Timer
是否是正确的方法。我对这种编码没有太多经验。在这一点上,我愿意尝试任何事情。
更新
多亏了 calebboyd,我离实现目标又近了一点。这是一些非常糟糕的代码:
private static SemaphoreSlim _locker;
public static void Main(string[] args) {
_manager = new MessageManager();
RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
_locker = new SemaphoreSlim(10, 10);
while (true) {
Thread thread = new Thread(new ParameterizedThreadStart(Process));
thread.Start();
}
}
private static async void Process(object args) {
_locker.WaitAsync();
try {
await _manager.ProcessMessages();
}
finally {
_locker.Release();
}
}
我能够接近每秒读取可观数量的消息,但问题是我的 ProcessMessages
调用永远不会完成(或者可能会在很长时间后完成)。我在想我可能需要在任何时候限制我拥有的线程数 运行。
关于如何改进此代码以便 ProcessMessages
有机会完成的任何建议?
我假设异步方法在线程池中排队,线程池的线程数与可用处理器的线程数一样多。您可能会生成 100 个请求,但它们仍然由 8 个线程执行。尝试创建 N 个线程的数组并使用它们。
因为你的 MessageManager 对象上的 ProcessMessages
方法没有被等待,所以我假设它绑定到执行它的同一个线程。仅仅将函数标记为 async
不会传递工作到一个新的线程。有了这个假设,这段代码实际上并不是用多线程执行的。您可以使用以下代码在更多线程池中执行您的代码。
管理器对象可能无法处理并发使用。所以我在 Task.Run lambda 中创建它。这也可能很昂贵,因此不切实际。
async Task RunBatchProcessingForeverAsync () {
var lock = new SemaphoreSlim(initialCount: 10);
while (true) {
await lock.WaitAsync();
Task.Run(() => {
try {
var manager = new MessageManager();
manager.ProcessMessages();
} finally {
lock.Release();
}
});
}
}
我已经有一段时间没有编写 C# 了,但这应该 运行 你的方法同时、重复、永远重复 10 次。
正如@calebboyd 所建议的,您必须首先使您的线程异步。现在,如果你去这里——
,您会发现一个异步线程足以快速汇集网络资源。如果您能够在单个请求中从亚马逊获取多条消息,那么您的生产者线程(对亚马逊进行异步调用的线程)就可以了——它每秒可以发送数百个请求。这不会是您的瓶颈。但是,处理接收到的数据的后续任务将交给线程池。在这里您有可能遇到瓶颈 - 假设每秒有 100 个响应到达,每个响应包含 100 条消息(达到您的 10K msgs/sec 近似值)。每秒您有 100 个新任务,每个任务都需要您的线程处理 100 条消息。现在有两种选择:(1) 这些消息的处理不受 CPU 约束 - 您只需将它们发送到您的数据库,或者 (2),您执行 CPU 消耗性计算,例如科学计算、序列化或一些繁重的业务逻辑。如果 (1) 是你的情况,那么瓶颈就会向后推向你的数据库。如果 (2),那么您别无选择,只能扩大/缩小或优化计算。但是你的瓶颈可能不是生产线程 - 如果它实施正确(参见上面的 link 示例)。
我有一项服务需要尽快从 Amazon SQS 读取消息。我们预计流量很大,我希望能够读取超过 10K messages/second。不幸的是,我目前大约 10 messages/second。显然,我有工作要做。
这就是我正在使用的(已转换为控制台应用程序以使测试更容易):
private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
_concurrentRequests = 0;
_maxConcurrentRequests = 100;
var timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
timer.Interval = 10;
timer.Enabled = true;
Console.ReadLine();
timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
if (_concurrentRequests < _maxConcurrentRequests) {
_concurrentRequests++;
ProcessMessages();
}
}
public static async Task ProcessMessages() {
var manager = new MessageManager();
manager.ProcessMessages(); // this is an async method that reads in the messages from SQS
_concurrentRequests--;
}
我没有接近 100 个并发请求,而且它似乎没有每 10 毫秒触发 OnTimedEvent
。
我不确定 Timer
是否是正确的方法。我对这种编码没有太多经验。在这一点上,我愿意尝试任何事情。
更新
多亏了 calebboyd,我离实现目标又近了一点。这是一些非常糟糕的代码:
private static SemaphoreSlim _locker;
public static void Main(string[] args) {
_manager = new MessageManager();
RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
_locker = new SemaphoreSlim(10, 10);
while (true) {
Thread thread = new Thread(new ParameterizedThreadStart(Process));
thread.Start();
}
}
private static async void Process(object args) {
_locker.WaitAsync();
try {
await _manager.ProcessMessages();
}
finally {
_locker.Release();
}
}
我能够接近每秒读取可观数量的消息,但问题是我的 ProcessMessages
调用永远不会完成(或者可能会在很长时间后完成)。我在想我可能需要在任何时候限制我拥有的线程数 运行。
关于如何改进此代码以便 ProcessMessages
有机会完成的任何建议?
我假设异步方法在线程池中排队,线程池的线程数与可用处理器的线程数一样多。您可能会生成 100 个请求,但它们仍然由 8 个线程执行。尝试创建 N 个线程的数组并使用它们。
因为你的 MessageManager 对象上的 ProcessMessages
方法没有被等待,所以我假设它绑定到执行它的同一个线程。仅仅将函数标记为 async
不会传递工作到一个新的线程。有了这个假设,这段代码实际上并不是用多线程执行的。您可以使用以下代码在更多线程池中执行您的代码。
管理器对象可能无法处理并发使用。所以我在 Task.Run lambda 中创建它。这也可能很昂贵,因此不切实际。
async Task RunBatchProcessingForeverAsync () {
var lock = new SemaphoreSlim(initialCount: 10);
while (true) {
await lock.WaitAsync();
Task.Run(() => {
try {
var manager = new MessageManager();
manager.ProcessMessages();
} finally {
lock.Release();
}
});
}
}
我已经有一段时间没有编写 C# 了,但这应该 运行 你的方法同时、重复、永远重复 10 次。
正如@calebboyd 所建议的,您必须首先使您的线程异步。现在,如果你去这里——