我如何为数组中的元素锁定 MUTEX,而不是为整个数组锁定

How can i lock a MUTEX for an element in the array, not for the complete array

问题的简短版本:我有 2 个函数共享同一个数组,当一个正在编辑它时,另一个正在读取它。但是,向量很长(5000 个样本)并且很少发生并发访问。但是 MUTEX1 上的 Mutex 争用正在减慢程序的速度。 '

如何锁定内存的某些位置而不是整个块以减少争用?

编辑:注意:我必须尽可能使用更新的 G 值。

EDIT2:例如我有长度为 5000 的数组 G。foo1 锁定 mutex1 以编辑索引 124。虽然 foo2 想要编辑索引 2349,但直到 foo1 发布 mutex1

有什么方法可以将锁定互斥锁的争用下移到元素级别?意思是:我希望 foo2foo1 只在同一个互斥锁上竞争,只有当他们想要编辑同一个索引时。例如:foo1 想编辑索引 3156,foo2 想编辑索引 3156。

带有代码解释的长版本: 我正在为一个复杂的数学函数编写代码,我正在使用 pthreads 来并行代码并提高性能。代码非常复杂,我可以 post 它但我可以 post 代码的模型。

基本上我有 2 个数组,我想使用 2 个 运行 并行的线程来编辑它们。一个线程 运行s foo1 和另一个 运行s foo2。但是,它们应该按特定顺序 运行,我使用 mutexes(_B_A1_A2) 来授予该顺序。它如下:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

然后我会检索我的结果。 在 foo1 的前半部分,我将使用 G1 中的结果,foo2 可能会同时对其进行编辑。因此我使用 Mutex1 来保护它。 Gfoo2 中也发生了同样的情况。然而,为 1 个值锁定整个向量是非常低效的,它们几乎从不同时编辑相同的内存位置。当我比较结果时,结果几乎总是一样的。我想要一种一次锁定一个元素的方法,以便他们只竞争相同的元素。

我将为有兴趣了解其工作原理的人描述代码:

#include <pthread.h>
#include <iostream>

using namespace std;

#define numThreads 2
#define Length 10000

pthread_t threads[numThreads];

pthread_mutex_t mutex1   = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_B  = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A2 = PTHREAD_MUTEX_INITIALIZER;

struct data_pointers
{
    double  *A;
    double  *B;
    double  *G;
    double  *L;
    int idxThread;
};

void foo1   (data_pointers &data);
void foo2   (data_pointers &data);

void *thread_func(void *arg){
    data_pointers data = *((data_pointers *) arg);
    if (data.idxThread==0)
        foo1 (data);
    else
        foo2 (data);
}

到这里是定义和线程调用函数,记住我定义了Length 10000numThreads 2

void foo1 ( data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;

    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/

        pthread_mutex_lock(&Mutex_A1);

        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here


            */
        }

        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){
            /*U another mathematical operations here


            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);
        }
    }
}

在 foo1 中我锁定 mutexA1 并完成我的工作,然后我锁定 MutexB 并解锁 MutexA2 因此 foo2 可以开始工作。请注意 main 从锁定 MutexA2 开始。这样我保证 foo1mutexB 锁定的情况下开始后半部分,这样, foo2 无法进入功能的后半部分,直到 foo1 解锁 mutexB

void foo2 (data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;

    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/

        pthread_mutex_lock(&Mutex_A1);

        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here


            */
        }

        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){        
            /*U another mathematical operations here


            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);

        }
    }
}

现在,当foo1解锁mutexB时,它必须等待foo2解锁mutexA1才能工作,foo2只会解锁mutexA2 当它已经解锁时 mutexB

这种情况持续了 5 次。

int main(){
    double G1[Length];
    double G2[Length];
    double B1[Length];
    double B2[Length];
    double A2[Length];
    double A1[Length];
    data_pointers data[numThreads];

    data[0].L           = G2;
    data[0].G           = G1;   
    data[0].A           = A1;
    data[0].B           = B1;
    data[0].idxThread   = 0;

    data[1].L           = G1;
    data[1].G           = G2;   
    data[1].A           = A2;
    data[1].B           = B2;
    data[1].idxThread   = 1;

    pthread_mutex_lock(&Mutex_A2);

    pthread_create(&(threads[0]), NULL, thread_func, (void *) &(data[0]));
    pthread_create(&(threads[1]), NULL, thread_func, (void *) &(data[1]));
    pthread_join(threads[1], NULL);
    pthread_join(threads[0], NULL);

    pthread_mutex_unlock(&Mutex_A1);
    pthread_mutex_unlock(&Mutex_A2);

    return 0;
}

请注意,这只是示例代码。按预期编译和工作,但没有输出。

最后编辑:感谢大家提出的好主意,我有很多经验,并且很高兴听从这些建议。我将对所有有用的答案进行投票,并选择最接近原始问题的答案(原子性)

如果不调整数组大小,则不需要对单个元素或整个数组使用任何互斥。

自动读取您的值,自动写入您的值并保持冷静。

如果您希望在不使用互斥锁的情况下对类似数组的数据结构进行高性能多线程访问,您可以研究比较和交换。也许你可以设计一个无锁数据结构来解决你的特定问题。 https://en.wikipedia.org/wiki/Compare-and-swap

关于发布的代码,你似乎把事情复杂化了。如果你想达到:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

应该有两个互斥锁。

也许这个可以。下面是一些伪代码:

// These global variables controls which thread is allowed to
// execute first and second half.
// 1 --> Foo1 may run
// 2 --> Foo2 may run
int accessFirstHalf = 1;
int accessSecondHalf = 1;

void foo1 ( data_pointers &data)
{
    while(YOU_LIKE_TO_GO_ON)
    {
        while (true)
        {
            TAKE_MUTEX_FIRST_HALF;
            if (accessFirstHalf == 1)
            {
                RELEASE_MUTEX_FIRST_HALF;
                break;
            }
            RELEASE_MUTEX_FIRST_HALF;
            pthread_yield();
        }

        // Do the first half

        TAKE_MUTEX_FIRST_HALF;
        // Allow Foo2 to do first half
        accessFirstHalf == 2;
        RELEASE_MUTEX_FIRST_HALF;

        while (true)
        {
            TAKE_MUTEX_SECOND_HALF;
            if (accessSecondHalf == 1)
            {
                RELEASE_MUTEX_SECOND_HALF;
                break;
            }
            RELEASE_MUTEX_SECOND_HALF;
            pthread_yield();
        }

        // Do the second half

        TAKE_MUTEX_SECOND_HALF;
        // Allow Foo2 to do second half
        accessSecondHalf == 2;
        RELEASE_MUTEX_SECOND_HALF;
    }
}


void foo2 ( data_pointers &data)
{
    while(YOU_LIKE_TO_GO_ON)
    {
        while (true)
        {
            TAKE_MUTEX_FIRST_HALF;
            if (accessFirstHalf == 2)
            {
                RELEASE_MUTEX_FIRST_HALF;
                break;
            }
            RELEASE_MUTEX_FIRST_HALF;
            pthread_yield();
        }

        // Do the first half

        TAKE_MUTEX_FIRST_HALF;
        // Allow Foo1 to do first half
        accessFirstHalf == 1;
        RELEASE_MUTEX_FIRST_HALF;

        while (true)
        {
            TAKE_MUTEX_SECOND_HALF;
            if (accessSecondHalf == 2)
            {
                RELEASE_MUTEX_SECOND_HALF;
                break;
            }
            RELEASE_MUTEX_SECOND_HALF;
            pthread_yield();
        }

        // Do the second half

        TAKE_MUTEX_SECOND_HALF;
        // Allow Foo1 to do second half
        accessSecondHalf == 1;
        RELEASE_MUTEX_SECOND_HALF;
    }
}


int main()
{
    // start the threads with foo1 and foo2
}

使用指向内存中'lock'特定位置的原子指针的示例代码:

#include <vector>
#include <atomic>
#include <thread>

using container = std::vector<std::atomic<double>>;
using container_size_type = container::size_type;

container c(300);

std::atomic<container::pointer> p_busy_elem{ nullptr };

void editor()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        p_busy_elem.exchange(&c[i]); // c[i] is busy
        // ... edit c[i] ... // E: calculate a value and assign it to c[i]
        p_busy_elem.exchange(nullptr); // c[i] is no longer busy
    }
}

void reader()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        // A1: wait for editor thread to finish editing value
        while (p_busy_elem == &c[i])
        {
            // A2: room a better algorithm to prevent blocking/yielding
            std::this_thread::yield();
        }

        // B: if c[i] is updated in between A and B, this will load the latest value
        auto value = c[i].load();

        // C: c[i] might have changed by this time, but we had the most up to date value we could get without checking again
        // ... use value ...
    }
}

int main()
{
    std::thread t_editor{ editor };
    std::thread t_reader{ reader };
    t_editor.join();
    t_reader.join();
}

在编辑器线程中,忙指针被设置为指示当前正在编辑该内存位置 (E)。如果线程 B 在忙指针设置后尝试读取该值,它将等到编辑完成后再继续 (A1).

关于 A2 的注释:更好的系统可以放在这里。可以保留尝试读取时忙碌的节点列表,然后我们将 i 添加到该列表并尝试稍后处理该列表。好处:可以告诉循环执行 continue 并且将读取超过当前正在编辑的 i 的索引。

复制要读取的值 (B) 以便在需要时使用它 (C)。这是我们最后一次检查 c[i] 的最新值。

这似乎是您要求的核心:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

实现这种与 pthread 交错的最简单方法是使用屏障。

使用 count 2 初始化 pthread_barrier_init() 屏障。foo1() 然后执行:

first half
pthread_barrier_wait()
second half
pthread_barrier_wait()
...
first half
pthread_barrier_wait()
second half
pthread_barrier_wait()

foo2()执行的顺序略有不同:

pthread_barrier_wait()
first half
pthread_barrier_wait()
second half
....
pthread_barrier_wait()
first half
pthread_barrier_wait()
second half