C++ 中的系统范围全局变量/信号量/互斥量/Linux?

System-wide global variable / semaphore / mutex in C++/Linux?

是否可以在 Linux 上用 C++ 创建系统范围的全局变量/信号量/互斥锁?

原因如下:我的系统经常在不相关的数据上运行同一软件的多个副本。通常有 4 个作业,每个作业 运行 都使用相同的软件。该软件有一个小部分,它创建一个占用大量内存的巨大图表;该部分之外的内存使用量适中。

有时会发生 2 个作业同时命中同一个内存消耗大的部分,整个系统开始交换。因此,我们希望通过在不同的作业之间创建类似 临界区互斥体 的东西来防止这种情况,这样一次不会有多个作业分配大量内存。

如果这些是同一作业的线程,pthread 锁就可以完成这项工作。

在不同作业之间实现这种互斥锁的好方法是什么?

如果您能让所有进程就一个通用名称达成一致,您可以使用 named semaphore

A named semaphore is identified by a name of the form /somename; that is, a null-terminated string of up to NAME_MAX-4 (i.e., 251) characters consisting of an initial slash, followed by one or more characters, none of which are slashes. Two processes can operate on the same named semaphore by passing the same name to sem_open(3).

互斥锁(mutexes)防止多个线程同时执行访问共享数据的关键代码段(即,mutexes用于序列化线程的执行)。所有互斥锁都必须是全局的。通过 mutex_lock() 成功调用互斥锁将导致另一个也在尝试锁定同一互斥锁的线程阻塞,直到所有者线程通过 mutex_unlock() 将其解锁。同一进程内或其他进程内的线程可以共享互斥量。

互斥体可以同步同一进程其他进程中的线程。如果互斥锁分配在可写内存中并在协作进程之间共享(参见 mmap(2)),并且已为此任务初始化,则互斥锁可用于同步进程之间的线程。

对于进程间同步,需要在这些进程之间共享的内存中分配一个互斥体。由于必须动态分配此类互斥锁的内存,因此需要使用 mutex_init() 显式初始化互斥锁。 此外,对于进程间同步,除了要求在共享内存中分配外,互斥锁还必须使用属性 PTHREAD_PROCESS_SHARED,否则从创建者之外的另一个进程访问互斥锁会导致未定义的行为(参见:linux.die.net/man/3/pthread_mutexattr_setpshared): "The process-shared attribute is set to PTHREAD_PROCESS_SHARED to permit a mutex to be operated upon by any thread that has access to the memory where the mutex is allocated, even if the mutex is allocated in memory that is shared by multiple processes."

进程间互斥,可以使用文件锁。使用 linux,代码就像调用 flock.

保护临界区一样简单
int fd_lock = open(LOCK_FILE, O_CREAT);

flock(fd_lock, LOCK_EX);

// do stuff

flock(fd_lock, LOCK_UN);

如果需要POSIX兼容性,可以使用fcntl

您可以使 C++ 互斥量在 Linux 上跨进程边界工作。但是,其中涉及一些黑魔法,这使得它不太适合生产代码。

解释:

标准库的 std::mutexstd::shared_mutex 在后台使用 pthread 的 struct pthread_mutex_spthread_rwlock_tnative_handle() 方法 returns 指向这些结构之一的指针。

缺点是某些细节是从标准库中抽象出来的,并默认在实现中。例如,std::shared_mutex 通过将 NULL 作为第二个参数传递给 pthread_rwlock_init() 来创建其基础 pthread_rwlock_t 结构。这应该是指向 pthread_rwlockattr_t 结构的指针,该结构包含确定共享策略的属性。

public:
    __shared_mutex_pthread()
    {
        int __ret = pthread_rwlock_init(&_M_rwlock, NULL);
        ...

理论上,它应该接收默认属性。根据 pthread_rwlockattr_getpshared() 的手册页:

The default value of the process-shared attribute is PTHREAD_PROCESS_PRIVATE.

也就是说,std::shared_mutex std::mutex 都可以跨进程工作。我正在使用 Clang 6.0.1(x86_64-unknown-linux-gnu / POSIX 线程模型)。以下是我所做检查的描述:

  • 创建共享内存区域 shm_open

  • fstat检查区域的大小以确定所有权。如果 .st_size 为零,则 ftruncate() 并且调用者知道这是该区域的创建过程。

  • 调用mmap就可以了。

    • 创建者进程使用placement-new在共享区域内构造一个std::mutexstd::shared_mutex对象。
    • 后续进程使用 reinterpret_cast<>() 获取指向同一对象的类型化指针。
  • 进程现在循环调用 trylock()unlock()。您可以看到他们在 trylock() 之前和之后以及 unlock().

  • 之前使用 printf() 互相阻止

额外的细节:我对 C++ 头文件或 pthreads 实现是否有问题很感兴趣,所以我深入研究了 pthread_rwlock_arch_t。您会发现 __shared 属性为零,__flags 属性对于 __PTHREAD_RWLOCK_INT_FLAGS_SHARED 表示的字段也为零。因此,似乎默认情况下不打算共享此结构,尽管它似乎无论如何都提供此功能(截至 2019 年 7 月)。

总结

它似乎有效,尽管有点偶然。我建议在编写与文档相反的生产软件时要谨慎。

我考虑使用 shared-pthread-mutex 解决方案,但不喜欢其中的逻辑竞赛。所以我写了一个 class 来使用原子内置

#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>

using std::string;

//from the command line - "ls /dev/shm" and "lsof /dev/shm/<name>" to see which process ID has access to it

template<typename PAYLOAD>
class InterprocessSharedVariable
{
protected:
    int mSharedMemHandle;
    string const mSharedMemoryName;
    bool mOpenedMemory;
    bool mHaveLock;
    pid_t mPID;

    // this is the shared memory structure
    typedef struct 
    {
        pid_t mutex;
        PAYLOAD payload;
    }
    tsSharedPayload;


    tsSharedPayload* mSharedData;


    bool openSharedMem()
    {
        mPID = getpid();

        // The following caters for the shared mem being created by root but opened by non-root,
        //  giving the shared-memory 777 permissions.
        int openFlags = O_CREAT | O_RDWR;
        int shareMode = S_IRWXU | S_IRWXG | S_IRWXO;

        // see 
        // store old
        mode_t old_umask = umask(0);

        mSharedMemHandle = shm_open (mSharedMemoryName.c_str(), openFlags, shareMode);

        // restore old
        umask(old_umask);

        if (mSharedMemHandle < 0) 
        {
            std::cerr << "failed to open shared memory"  << std::endl;
            return false;
        }

        if (-1 == ftruncate(mSharedMemHandle, sizeof(tsSharedPayload)))
        {
            std::cerr <<  "failed to resize shared memory" << std::endl;
            return false;
        }

        mSharedData = (tsSharedPayload*) mmap (NULL, 
                                            sizeof(tsSharedPayload),
                                            PROT_READ | PROT_WRITE,
                                            MAP_SHARED,
                                            mSharedMemHandle,
                                            0);

        if (MAP_FAILED == mSharedData)
        {
            std::cerr << "failed to map shared memory" << std::endl;
            return false;
        }

        return true;
    }


    void closeSharedMem()
    {
        if (mSharedMemHandle > 0)
        {
            mSharedMemHandle = 0;
            shm_unlink (mSharedMemoryName.c_str());
        }
    }

public:
    InterprocessSharedVariable () = delete;

    InterprocessSharedVariable (string const&& sharedMemoryName) : mSharedMemoryName(sharedMemoryName)
    {
        mSharedMemHandle = 0;
        mOpenedMemory = false;
        mHaveLock = false;
        mPID = 0;
    }

    virtual ~InterprocessSharedVariable ()
    {
        releaseSharedVariable ();
        closeSharedMem ();
    }

    // no copying
    InterprocessSharedVariable (InterprocessSharedVariable const&) = delete;
    InterprocessSharedVariable& operator= (InterprocessSharedVariable const&) = delete;


    bool tryLockSharedVariable (pid_t& ownerProcessID)
    {
        // Double-checked locking.  See if a process has already grabbed the mutex.  Note the process could be dead
        __atomic_load (&mSharedData->mutex, &ownerProcessID, __ATOMIC_SEQ_CST);

        if (0 != ownerProcessID)
        {
            // It is possible that we have started with the same PID as a previous process that terminated abnormally
            if (ownerProcessID == mPID)
            {
                // ... in which case, we already "have ownership"
                return (true);
            }

            // Another process may have the mutex.  Check whether it is alive.
            // We are specifically looking for an error returned with ESRCH
            // Note that if the other process is owned by root, "kill 0" may return a permissions error (which indicates the process is running!)
            int processCheckResult = kill (ownerProcessID, 0);

            if ((0 == processCheckResult) || (ESRCH != errno))
            {
                // another process owns the shared memory and is running
                return (false);
            }

            // Here: The other process does not exist ((0 != processCheckResult) && (ESRCH == errno))
            // We could assume here that we can now take ownership, but be proper and fall into the compare-exchange
            ownerProcessID = 0;
        }

        // It's possible that another process has snuck in here and taken ownership of the shared memory.
        // If that has happened, the exchange will "fail" (and the existing PID is stored in ownerProcessID)

        // ownerProcessID == 0 -> representing the "expected" value
        mHaveLock = __atomic_compare_exchange_n (&mSharedData->mutex,
                                                &ownerProcessID,      //"expected"
                                                mPID,                 //"desired"
                                                false,                //"weak"
                                                __ATOMIC_SEQ_CST,     //"success-memorder"
                                                __ATOMIC_SEQ_CST);    //"fail-memorder"

        return (mHaveLock);
    }


    bool acquireSharedVariable (bool& failed, pid_t& ownerProcessID)
    {
        if (!mOpenedMemory)
        {
            mOpenedMemory = openSharedMem ();

            if (!mOpenedMemory)
            {
                ownerProcessID = 0;
                failed = true;
                return false;
            }
        }

        // infrastructure is working
        failed = false;

        bool gotLock = tryLockSharedVariable (ownerProcessID);
        return (gotLock);
    }

    void releaseSharedVariable ()
    {
        if (mHaveLock)
        {
            __atomic_store_n (&mSharedData->mutex, 0, __ATOMIC_SEQ_CST);
            mHaveLock = false;
        }
    }
};

示例用法 - 这里我们只是简单地使用它来确保只有一个应用程序实例运行。

int main(int argc, char *argv[])
{
    typedef struct { } tsEmpty;
    InterprocessSharedVariable<tsEmpty> programMutex ("/run-once");

    bool memOpenFailed;
    pid_t ownerProcessID;
    if (!programMutex.acquireSharedVariable (memOpenFailed, ownerProcessID))
    {
        if (memOpenFailed)
        {
            std::cerr << "Failed to open shared memory" << std::endl;
        }
        else
        {
            std::cerr << "Program already running - process ID " << ownerProcessID << std::endl;
        }
        return -1;
    }

    ... do stuff ...

    return 0;
}