在 C# 中将 while 循环与 Task.Run() 相结合

Combining a while loop with Task.Run() in C#

我对 C# 中的多线程应用程序还很陌生,我正在尝试编辑下面的代码,以便它 运行 在多个线程上运行。现在它同步运行并且占用很少的 cpu 电量。我需要它在多个线程上 运行 更快。我的想法是为每个核心启动一个任务,然后当一个任务完成时,让另一个任务取代它或类似的东西,如果可能的话。

static void Main(string[] args)
    {
        string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
        QueueClient Client = QueueClient.CreateFromConnectionString(connectionString, "OoplesQueue");

        try
        {
            while (true)
            {
                Task.Run(() => processCalculations(Client));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
        }
    }

    public static ConnectionMultiplexer connection;
    public static IDatabase cache;

    public static async Task processCalculations(QueueClient client)
    {
        try
        {
            BrokeredMessage message = await client.ReceiveAsync();

            if (message != null)
            {
                if (connection == null || !connection.IsConnected)
                {

                    connection = await ConnectionMultiplexer.ConnectAsync("connection,SyncTimeout=10000,ConnectTimeout=10000");
                    //connection = ConnectionMultiplexer.Connect("connection,SyncTimeout=10000,ConnectTimeout=10000");
                }

                cache = connection.GetDatabase();

                string sandpKey = message.Properties["sandp"].ToString();
                string dateKey = message.Properties["date"].ToString();
                string symbolclassKey = message.Properties["symbolclass"].ToString();
                string stockdataKey = message.Properties["stockdata"].ToString();
                string stockcomparedataKey = message.Properties["stockcomparedata"].ToString();

                List<StockData> sandp = cache.Get<List<StockData>>(sandpKey);
                DateTime date = cache.Get<DateTime>(dateKey);
                SymbolInfo symbolinfo = cache.Get<SymbolInfo>(symbolclassKey);
                List<StockData> stockdata = cache.Get<List<StockData>>(stockdataKey);
                List<StockMarketCompare> stockcomparedata = cache.Get<List<StockMarketCompare>>(stockcomparedataKey);

                StockRating rating = performCalculations(symbolinfo, date, sandp, stockdata, stockcomparedata);

                if (rating != null)
                {
                    saveToTable(rating);
                    if (message.LockedUntilUtc.Minute <= 1)
                    {
                        await message.RenewLockAsync();
                    }
                    await message.CompleteAsync();
                }
                else
                {
                    Console.WriteLine("Message " + message.MessageId + " Completed!");
                    await message.CompleteAsync();
                }
            }
        }
        catch (TimeoutException time)
        {
            Console.WriteLine(time.Message);
        }
        catch (MessageLockLostException locks)
        {
            Console.WriteLine(locks.Message);
        }
        catch (RedisConnectionException redis)
        {
            Console.WriteLine("Start the redis server service!");
        }
        catch (MessagingCommunicationException communication)
        {
            Console.WriteLine(communication.Message);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
        }
    }

您需要两个部分:

第 1 部分等待传入消息:ConnectAsync() 这 运行 在一个简单的循环中。每当收到某些内容时,就会启动 Part2 的实例来处理传入的消息。

第 2 部分 运行 在另一个线程/在后台处理单个传入消息。

这样 Part2 的多个实例可以并行 运行。

所以你的结构是这样的:

while (true)
{
    connection = await ConnectionMultiplexer.ConnectAsync(...);
    StartProcessCalculationsInBackground(connection, ...); // return immediately
}

Right now it operates synchronously and it takes up very little cpu power. I need it to run much faster on multiple threads.

"Multiple threads" 并不一定意味着 "faster"。仅当您要执行多个相互独立的计算并且它们是 CPU 绑定(意味着它们主要涉及 CPU 操作,而不是 IO 操作)时,这才是正确的。

此外,异步并不一定意味着多线程。这只是意味着您的操作在进行时不会阻塞进程线程。如果您正在启动另一个线程并阻塞它,那么 看起来像 异步,但实际上不是。观看第 9 频道的视频:Async Library Methods Shouldn't Lie

您在 processCalculations 中的大部分操作看起来都相互依赖;然而,这部分可能是一个潜在的改进点:

List<StockData> sandp = cache.Get<List<StockData>>(sandpKey);
DateTime date = cache.Get<DateTime>(dateKey);
SymbolInfo symbolinfo = cache.Get<SymbolInfo>(symbolclassKey);
List<StockData> stockdata = cache.Get<List<StockData>>(stockdataKey);
List<StockMarketCompare> stockcomparedata = cache.Get<List<StockMarketCompare>>(stockcomparedataKey);
StockRating rating = performCalculations(symbolinfo, date, sandp, stockdata, stockcomparedata);

我不熟悉您正在使用的 API,但如果它包含与 Get 方法等效的异步方法,您可能能够并行异步执行这些 IO 操作,例如:

var sandpTask = List<StockData> sandp = cache.GetAsync<List<StockData>>(sandpKey);
var dateTask = cache.GetAsync<DateTime>(dateKey);
var symbolinfoTask = cache.GetAsync<SymbolInfo>(symbolclassKey);
var stockdataTask = cache.GetAsync<List<StockData>>(stockdataKey);
var stockcomparedataTask = cache.GetAsync<List<StockMarketCompare>>(stockcomparedataKey);

await Task.WhenAll(sandpTask, dateTask,symbolinfoTask,
    stockdataTask, stockcomparedataTask);

List<StockData> sandp = sandpTask.Result;
DateTime date = dateTask.Result;
SymbolInfo symbolinfo = symbolinfoTask.Result;
List<StockData> stockdata = stockdataTask.Result;
List<StockMarketCompare> stockcomparedata = stockcomparedataTask.Result;

StockRating rating = performCalculations(symbolinfo, date, sandp, stockdata, stockcomparedata);

另外,请注意您不需要将 processCalculations 调用包装在另一个任务中,因为它已经 returns 一个任务:

// instead of Task.Run(() => processCalculations(message));
processCalculations(message); 

这看起来像是经典的生产者-消费者模式。

在这种情况下,您需要并发结合 async IO 绑定操作(例如从 Redis 缓存中检索数据)和 CPU绑定 操作(例如进行计算绑定计算),我会利用 TPL Dataflow 来完成这项工作。

您可以使用 ActionBlock<T> 来负责处理您传递给它的单个操作。在幕后,它负责并发,而您可以通过传递 ExecutionDataflowBlockOptions 来根据需要限制它。

您首先创建 ActionBlock<BrokeredMessage>:

private static void Main(string[] args)
{
    var actionBlock = new ActionBlock<BrokeredMessage>(async message =>
                      await ProcessCalculationsAsync(message),
                      new ExecutionDataflowBlockOptions 
                      {
                          MaxDegreeOfParallelism = Environment.ProcessorCount
                      });

    var produceMessagesTask = Task.Run(async () => await
                                                   ProduceBrokeredMessagesAsync(client, 
                                                   actionBlock));

    produceMessagesTask.Wait();
}

现在让我们看看 ProduceBrokeredMessageAsync。它只是接收您的 QueueClientActionBlock 到以下内容:

private async Task ProduceBrokeredMessagesAsync(QueueClient client,
                                                ActionBlock<BrokeredMessage> actionBlock)
{
    BrokeredMessage message;
    while ((message = await client.ReceiveAsync()) != null)
    {
        await actionBlock.SendAsync(message);
    }
    actionBlock.Complete();
    await actionBlock.Completion;
}

当您从 QueueClient 接收消息时,它会异步 post 将消息发送到 ActionBlock,后者将同时处理这些消息。