在 C# 中执行并发任务

Performing concurrent tasks in C#

我有一项服务需要尽快从 Amazon SQS 读取消息。我们预计流量很大,我希望能够读取超过 10K messages/second。不幸的是,我目前大约 10 messages/second。显然,我有工作要做。

这就是我正在使用的(已转换为控制台应用程序以使测试更容易):

private static int _concurrentRequests;
private static int _maxConcurrentRequests;

public static void Main(string[] args) {
    _concurrentRequests = 0;
    _maxConcurrentRequests = 100;

    var timer = new Timer();
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
    timer.Interval = 10;
    timer.Enabled = true;

    Console.ReadLine();
    timer.Dispose();
}

public static void OnTimedEvent(object s, ElapsedEventArgs e) {
    if (_concurrentRequests < _maxConcurrentRequests) {
        _concurrentRequests++;
        ProcessMessages();
    }
}

public static async Task ProcessMessages() {
    var manager = new MessageManager();
    manager.ProcessMessages();  // this is an async method that reads in the messages from SQS

    _concurrentRequests--;
}

我没有接近 100 个并发请求,而且它似乎没有每 10 毫秒触发 OnTimedEvent

我不确定 Timer 是否是正确的方法。我对这种编码没有太多经验。在这一点上,我愿意尝试任何事情。

更新

多亏了 calebboyd,我离实现目标又近了一点。这是一些非常糟糕的代码:

private static SemaphoreSlim _locker;

public static void Main(string[] args) {
    _manager = new MessageManager();

    RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
    _locker = new SemaphoreSlim(10, 10);
    while (true) {
        Thread thread = new Thread(new ParameterizedThreadStart(Process));
        thread.Start();
    }
}

private static async void Process(object args) {
    _locker.WaitAsync();
    try {
        await _manager.ProcessMessages();
    }
    finally {
        _locker.Release();
    }

}

我能够接近每秒读取可观数量的消息,但问题是我的 ProcessMessages 调用永远不会完成(或者可能会在很长时间后完成)。我在想我可能需要在任何时候限制我拥有的线程数 运行。

关于如何改进此代码以便 ProcessMessages 有机会完成的任何建议?

我假设异步方法在线程池中排队,线程池的线程数与可用处理器的线程数一样多。您可能会生成 100 个请求,但它们仍然由 8 个线程执行。尝试创建 N 个线程的数组并使用它们。

因为你的 MessageManager 对象上的 ProcessMessages 方法没有被等待,所以我假设它绑定到执行它的同一个线程。仅仅将函数标记为 async 不会传递工作到一个新的线程。有了这个假设,这段代码实际上并不是用多线程执行的。您可以使用以下代码在更多线程池中执行您的代码。

管理器对象可能无法处理并发使用。所以我在 Task.Run lambda 中创建它。这也可能很昂贵,因此不切实际。

async Task RunBatchProcessingForeverAsync () {
    var lock = new SemaphoreSlim(initialCount: 10);
    while (true) {
        await lock.WaitAsync();
        Task.Run(() => {
            try {
                var manager = new MessageManager();
                manager.ProcessMessages();
            } finally {
                lock.Release();
            }
        });
    }
}

我已经有一段时间没有编写 C# 了,但这应该 运行 你的方法同时、重复、永远重复 10 次。

正如@calebboyd 所建议的,您必须首先使您的线程异步。现在,如果你去这里—— ,您会发现一个异步线程足以快速汇集网络资源。如果您能够在单个请求中从亚马逊获取多条消息,那么您的生产者线程(对亚马逊进行异步调用的线程)就可以了——它每秒可以发送数百个请求。这不会是您的瓶颈。但是,处理接收到的数据的后续任务将交给线程池。在这里您有可能遇到瓶颈 - 假设每秒有 100 个响应到达,每个响应包含 100 条消息(达到您的 10K msgs/sec 近似值)。每秒您有 100 个新任务,每个任务都需要您的线程处理 100 条消息。现在有两种选择:(1) 这些消息的处理不受 CPU 约束 - 您只需将它们发送到您的数据库,或者 (2),您执行 CPU 消耗性计算,例如科学计算、序列化或一些繁重的业务逻辑。如果 (1) 是你的情况,那么瓶颈就会向后推向你的数据库。如果 (2),那么您别无选择,只能扩大/缩小或优化计算。但是你的瓶颈可能不是生产线程 - 如果它实施正确(参见上面的 link 示例)。