Thread Local Storage工作原理

Thread Local Storage working principle

这是 Apress 并行编程书籍中有关线程本地存储 (TLS) 的示例。我知道如果我们有 4 个核心计算机,4 个线程可以同时 运行 并行。在这个例子中,我们创建了 10 个任务,我们假设有 4 个核心计算机。每个线程本地存储都存在于线程中,因此当启动 10 个并行任务时,只有 4 个线程执行。我们有 4 个 TLS,所以 10 个任务尝试更改 4 个线程本地存储对象。我想问一下当线程数 < 任务数时 Tls 如何防止数据竞争问题??

using System;
using System.Threading;
using System.Threading.Tasks;
namespace Listing_04
{
    class BankAccount
    {
        public int Balance
        {
            get;
            set;
        }
    }
    class Listing_04
    {
        static void Main(string[] args)
        {
            // create the bank account instance
            BankAccount account = new BankAccount();
            // create an array of tasks
            Task<int>[] tasks = new Task<int>[10];
            // create the thread local storage
            ThreadLocal<int> tls = new ThreadLocal<int>();
            for (int i = 0; i < 10; i++)
            {
                // create a new task
                tasks[i] = new Task<int>((stateObject) =>
                {
                    // get the state object and use it
                    // to set the TLS data
                    tls.Value = (int)stateObject;
                    // enter a loop for 1000 balance updates
                    for (int j = 0; j < 1000; j++)
                    {
                        // update the TLS balance
                        tls.Value++;
                    }
                    // return the updated balance
                    return tls.Value;
                }, account.Balance);
                // start the new task
                tasks[i].Start();
            }
            // get the result from each task and add it to
            // the balance
            for (int i = 0; i < 10; i++)
            {
                account.Balance += tasks[i].Result;
            }
            // write out the counter value
            Console.WriteLine("Expected value {0}, Balance: {1}",
            10000, account.Balance);
            // wait for input before exiting
            Console.WriteLine("Press enter to finish");
            Console.ReadLine();
        }
    }
}

We have 4 TLS so 10 task try to change 4 Thread local storage object

在您的示例中,您可以有 1 到 10 个 TLS 槽。这是因为 a) 您没有显式管理线程,因此任务是使用线程池执行的,并且 b) 线程池根据需要随着时间的推移创建和销毁线程。

只有 1000 次迭代的循环将几乎瞬间完成。因此,在线程池确定某个工作项已等待足够长的时间以证明添加任何新线程之前,您的所有 10 个任务都可能通过线程池。但是没有保证

the documentation 的一些重要部分包括这些语句:

By default, the minimum number of threads is set to the number of processors on a system

When demand is low, the actual number of thread pool threads can fall below the minimum values.

换句话说,在您的四核系统上,默认的最小线程数是四个,但线程池中 实际 活跃线程数实际上可能是少于那个。如果任务执行时间足够长,活动线程的数量可能会超过该数量。

这里要记住的最重要的事情是 在线程池的上下文中使用 TLS 几乎肯定是错误的做法。

当您可以控制线程时使用 TLS,并且您希望线程能够维护一些私有数据或该线程独有的数据。这与使用线程池时发生的情况 相反。即使在最简单的情况下,多个任务也可以使用同一个线程,因此最终会共享 TLS。在更复杂的场景中,例如使用 await 时,单个任务可能会在不同的线程中执行,因此一个任务可能会使用不同的 TLS 值,具体取决于当时分配给该任务的线程时刻.

how Tls prevent data race problem when thread count < Task count ??

这取决于你在说什么 "data race problem"

事实是,您发布的代码充满了问题,如果不是完全错误的话,至少也很奇怪。例如,您传递 account.Balance 作为每个任务的初始值。但为什么?在您创建任务时会评估此值,之后才能对其进行修改,那么传递它有什么意义?

如果您认为在任务开始时传递的是当前值,那似乎也是错误的。为什么让给定任务的起始值根据已经完成并在您的后续循环中考虑的任务数量而变化是有效的? (需要明确的是:那是 而不是 正在发生的事情……但即使是,这也是一件奇怪的事情。)

除此之外,尚不清楚您认为在这里使用 TLS 会完成什么。当每个任务开始时,您将 TLS 值重新初始化为 0(即您已传递给 Task<int> 构造函数的 account.Balance 的值)。因此,在执行任何给定任务的上下文中,所涉及的任何线程都不会看到 0 以外的值。局部变量将完成完全相同的事情,没有 TLS 的开销,也不会混淆阅读代码并试图弄清楚为什么在 TLS 对代码没有任何价值的情况下使用 TLS 的人。

那么,TLS 是否解决了某种 "data race problem"?不是在这个例子中,它似乎没有。所以问它是如何做到的是不可能回答的。它不会那样做,所以没有 "how".


对于它的价值,我稍微修改了您的示例,以便它可以报告分配给任务的各个线程。我发现在我的机器上,使用的线程数在 2 到 8 之间变化。这与我的八核机器一致,由于池中的第一个线程在池初始化其他线程并向它们分配任务之前可以完成多少而有所不同。最常见的是,我会看到第一个线程完成三到五个任务,其余任务由剩余的单独线程处理。

在每种情况下,线程池都会在任务启动后立即 创建 八个线程。但大多数时候,至少有一个线程未被使用,因为其他线程能够在池饱和之前完成任务。也就是说,线程池中仅管理任务的开销,在您的示例中,任务非常便宜,以至于该开销允许一个或多个线程池线程在线程池需要该线程用于另一个任务之前完成一个任务。

我在下面复制了那个版本。请注意,我还在试验迭代之间添加了延迟,以允许线程池终止它创建的线程(在我的机器上,这花费了 20 秒,因此延迟时间是硬编码的......您可以在调试器中看到线程被终止输出)。

static void Main(string[] args)
{
    while (_PromptContinue())
    {
        // create the bank account instance
        BankAccount account = new BankAccount();
        // create an array of tasks
        Task<int>[] tasks = new Task<int>[10];
        // create the thread local storage
        ThreadLocal<int> tlsBalance = new ThreadLocal<int>();
        ThreadLocal<(int Id, int Count)> tlsIds = new ThreadLocal<(int, int)>(
            () => (Thread.CurrentThread.ManagedThreadId, 0), true);
        for (int i = 0; i < 10; i++)
        {
            int k = i;
            // create a new task
            tasks[i] = new Task<int>((stateObject) =>
            {
                // get the state object and use it
                // to set the TLS data
                tlsBalance.Value = (int)stateObject;
                (int id, int count) = tlsIds.Value;
                tlsIds.Value = (id, count + 1);
                Console.WriteLine($"task {k}: thread {id}, initial value {tlsBalance.Value}");
                // enter a loop for 1000 balance updates
                for (int j = 0; j < 1000; j++)
                {
                    // update the TLS balance
                    tlsBalance.Value++;
                }
                // return the updated balance
                return tlsBalance.Value;
            }, account.Balance);
            // start the new task
            tasks[i].Start();
        }

        // Make sure this thread isn't busy at all while the thread pool threads are working
        Task.WaitAll(tasks);

        // get the result from each task and add it to
        // the balance
        for (int i = 0; i < 10; i++)
        {
            account.Balance += tasks[i].Result;
        }

        // write out the counter value
        Console.WriteLine("Expected value {0}, Balance: {1}", 10000, account.Balance);
        Console.WriteLine("{0} thread ids used: {1}",
            tlsIds.Values.Count,
            string.Join(", ", tlsIds.Values.Select(t => $"{t.Id} ({t.Count})")));
        System.Diagnostics.Debug.WriteLine("done!");
        _Countdown(TimeSpan.FromSeconds(20));
    }
}

private static void _Countdown(TimeSpan delay)
{
    System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();

    TimeSpan remaining = delay - sw.Elapsed,
        sleepMax = TimeSpan.FromMilliseconds(250);
    int cchMax = $"{delay.TotalSeconds,2:0}".Length;
    string format = $"\r{{0,{cchMax}:0}}", previousText = null;

    while (remaining > TimeSpan.Zero)
    {
        string nextText = string.Format(format, remaining.TotalSeconds);

        if (previousText != nextText)
        {
            Console.Write(format, remaining.TotalSeconds);
            previousText = nextText;
        }
        Thread.Sleep(remaining > sleepMax ? sleepMax : remaining);
        remaining = delay - sw.Elapsed;
    }

    Console.Write(new string(' ', cchMax));
    Console.Write('\r');
}

private static bool _PromptContinue()
{
    Console.Write("Press Esc to exit, any other key to proceed: ");
    try
    {
        return Console.ReadKey(true).Key != ConsoleKey.Escape;
    }
    finally
    {
        Console.WriteLine();
    }
}