Azure Table 为 ExecuteSegmentedQuery 和 StreamWriter 存储单独的线程

Azure Table Storage separate thread for ExecuteSegmentedQuery and StreamWriter

我有一个控制台应用程序,它从 Azure Table 存储中检索数据并将其传输到文本文件。目前,我的代码如下所示:

 using (StreamWriter writer = new StreamWriter(filePath))
        {
            //header
            writer.Write("PartitionKey|RowKey|Timestamp|Id|");
            writer.WriteLine();

            do
            {
                var entities = new List<TModel>();
                var queryResult = table.ExecuteQuerySegmented(new TableQuery<TModel>(), token);
                entities.AddRange(queryResult.Results);
                count = count + 1000;

                Console.WriteLine("{0} records retrieved", count);

                //records
                foreach (TModel entity in entities)
                {
                    writer.Write("\"" + entity.PartitionKey + "\"|\"" + entity.RowKey + "\"|" + entity.Timestamp + "|" + entity.Id);
                    writer.WriteLine();
                }
                token = queryResult.ContinuationToken;
            }
            while (token != null);
            writer.Dispose();

现在这段代码在单线程中运行,这意味着它会先获取前 1000 条记录,然后再将其写入文本文件,然后获取接下来的 1000 条记录,然后将其追加到文件中。

我想知道是否有一种方法可以将数据检索和写入文件的处理拆分为两个单独的线程,这意味着一个线程将只执行分段查询,一个线程将执行写入要归档的数据(如队列)?

欢迎提出任何建议。提前致谢!

根据你的描述,我建议你创建2个线程。一个是生产者线程,一个是消费者线程。此外,还需要一个线程安全的集合。我们可以使用 ConcurrentQueue Class 来存储将从这 2 个线程访问的实体。

以下代码供您参考。

class Program
{
    static void Main(string[] args)
    {
        //Run producer  thread
        Task.Run(new Action(ReadDataFromTable));
        //Run consumer  thread
        Task.Run(new Action(WriteDataToText));

        Console.Read();
    }

    public static ConcurrentQueue<TModel> entitiesQueue = new ConcurrentQueue<TModel>();
    //This tag point out whether the read process is down
    public static bool ReadTableFinished = false;
    public static void ReadDataFromTable()
    {
        do
        {
            var entities = new List<TModel>();
            var queryResult = table.ExecuteQuerySegmented(new TableQuery<TModel>(), token);
            entities.AddRange(queryResult.Results);
            count = count + 1000;

            Console.WriteLine("{0} records retrieved", count);

            //Save entites to the queue
            foreach (TModel entity in entities)
            {
                entitiesQueue.Enqueue(entity);
            }
            token = queryResult.ContinuationToken;
        }
        while (token != null);

        ReadTableFinished = true;
    }

    public static void WriteDataToText()
    {
        using (StreamWriter writer = new StreamWriter(filePath))
        {
            //header
            writer.Write("PartitionKey|RowKey|Timestamp|Id|");
            writer.WriteLine();

            do
            {
                TModel entity = null;
                //Read and remove entity from queue 
                entitiesQueue.TryDequeue(out entity);

                if (entity != null)
                {
                    //Write data to text file
                    writer.Write("\"" + entity.PartitionKey + "\"|\"" + entity.RowKey + "\"|" + entity.Timestamp + "|" + entity.Id);
                    writer.WriteLine();
                }
                else
                {
                    //If all the entities are read out from the table and now there is no entities in the queue, we will break this loop and exit current thread.
                    if (ReadTableFinished)
                    {
                        break;
                    }
                }
            }
            while (true);
            writer.Dispose();
        }
    }
}

adding a breakpoint will not enter the code without Console.Read().

如果不添加 Console.Read() 来阻塞 UI 线程,UI 线程将 运行 完全阻塞。如果 UI 线程的状态已完成,则其他线程(任务)不能 运行。如果你不想添加 Console.Read()。您可以在 Main 方法中使用以下代码。 Task.WaitAll 方法将阻塞当前 UI 线程,直到所有任务 运行 完成。

static void Main(string[] args)
{
    //Run producer  thread
    Task t1 = Task.Run(new Action(ReadDataFromTable));
    //Run consumer  thread
    Task t2 = Task.Run(new Action(WriteDataToText));

    Task.WaitAll(t1, t2);
    Console.WriteLine("all completed");
}