使用原子索引从互斥锁数组中锁定互斥锁
Lock a mutex from mutex array with atomic index
我正在尝试编写一个可以将数据推送到缓冲区的缓冲区,检查缓冲区是否已满,并在必要时交换缓冲区。另一个线程可以获得文件输出的缓冲区。
我已经成功实现了缓冲区,但我想添加一个 ForceSwapBuffer 方法,该方法将强制交换不完整的缓冲区和 return 来自不完整缓冲区的数据。为此,我检查读取和写入缓冲区是否相同(尝试强制交换缓冲区以写入文件是没有用的,同时还有其他可以写入的完整缓冲区)。
我希望此方法能够 运行 与 GetBuffer 方法并存(不是真的有必要,但我想尝试一下并偶然发现了这个问题)。
GetBuffer 会阻塞,当 ForceSwapBuffer 完成时它仍然会阻塞,直到新缓冲区完全填满,因为在 ForceSwapBuffer 中我更改了原子 _read_buffer_index。我想知道这是否会一直有效? GetBuffer 的阻塞锁是否会检测原子 read_buffer_index 的变化并更改它试图锁定的互斥锁,或者它会在锁开始时检查它必须锁定的互斥锁并继续尝试锁定同一个互斥锁即使索引发生变化?
/* selection of member data */
unsigned int _size, _count;
std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;
std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr< std::mutex[] > _mutexes;
std::recursive_mutex _force_swap_buffer;
/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_index + length <= _size) {
memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
_index += length;
} else {
memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
unsigned int t_index = _index;
SwapBuffer();
Push(&data[_size - t_index], length - (_size - t_index));
}
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
std::unique_ptr<T[]> result(new T[_size]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
_read_buffer_index = (_read_buffer_index + 1) % _count;
return std::move(result);
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time
if (_write_buffer_index != _read_buffer_index)
return nullptr;
std::unique_ptr<T[]> result(new T[_index]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));
unsigned int next = (_write_buffer_index + 1) % _count;
_mutexes[next].lock();
_read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
_mutexes[_write_buffer_index].unlock();
_write_buffer_index = next;
_index = 0;
return result;
}
您的代码存在一些问题。首先,修改原子变量时要小心。只有一小部分操作是真正原子的(见 http://en.cppreference.com/w/cpp/atomic/atomic),原子操作的组合不是原子的。考虑:
_read_buffer_index = (_read_buffer_index + 1) % _count;
这里发生的是你有一个变量的原子读取、一个增量、一个模运算和一个原子存储。但是,整个语句本身 不是 原子的!如果 _count
是 2 的幂,您可以只使用 ++
运算符。如果不是,则必须将 _read_buffer_index
读入临时变量,执行上述计算,然后使用 compare_exchange
函数存储新值 如果变量未更改与此同时。显然后者必须循环执行,直到成功。您还必须担心一个线程在第二个线程的读取和 compare_exchange 之间递增变量 _count
次的可能性,在这种情况下,第二个线程错误地认为变量没有改变。
第二个问题是缓存行弹跳。如果你在同一个缓存行上有多个互斥体,那么如果两个或多个线程试图同时访问它们,性能将会非常糟糕。缓存行的大小取决于您的平台。
主要问题是,虽然 ForceSwapBuffer()
和 Push()
都锁定了 _force_swap_buffer
互斥量,但 GetBuffer()
没有。 GetBuffer()
然而确实改变了 _read_buffer_index
。所以在 ForceSwapBuffer()
:
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_write_buffer_index != _read_buffer_index)
return nullptr;
// another thread can call GetBuffer() here and change _read_buffer_index
// rest of the code here
if
语句后 _write_buffer_index == _read_buffer_index
的假设实际上是无效的。
我正在尝试编写一个可以将数据推送到缓冲区的缓冲区,检查缓冲区是否已满,并在必要时交换缓冲区。另一个线程可以获得文件输出的缓冲区。
我已经成功实现了缓冲区,但我想添加一个 ForceSwapBuffer 方法,该方法将强制交换不完整的缓冲区和 return 来自不完整缓冲区的数据。为此,我检查读取和写入缓冲区是否相同(尝试强制交换缓冲区以写入文件是没有用的,同时还有其他可以写入的完整缓冲区)。 我希望此方法能够 运行 与 GetBuffer 方法并存(不是真的有必要,但我想尝试一下并偶然发现了这个问题)。
GetBuffer 会阻塞,当 ForceSwapBuffer 完成时它仍然会阻塞,直到新缓冲区完全填满,因为在 ForceSwapBuffer 中我更改了原子 _read_buffer_index。我想知道这是否会一直有效? GetBuffer 的阻塞锁是否会检测原子 read_buffer_index 的变化并更改它试图锁定的互斥锁,或者它会在锁开始时检查它必须锁定的互斥锁并继续尝试锁定同一个互斥锁即使索引发生变化?
/* selection of member data */
unsigned int _size, _count;
std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;
std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr< std::mutex[] > _mutexes;
std::recursive_mutex _force_swap_buffer;
/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_index + length <= _size) {
memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
_index += length;
} else {
memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
unsigned int t_index = _index;
SwapBuffer();
Push(&data[_size - t_index], length - (_size - t_index));
}
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
std::unique_ptr<T[]> result(new T[_size]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
_read_buffer_index = (_read_buffer_index + 1) % _count;
return std::move(result);
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time
if (_write_buffer_index != _read_buffer_index)
return nullptr;
std::unique_ptr<T[]> result(new T[_index]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));
unsigned int next = (_write_buffer_index + 1) % _count;
_mutexes[next].lock();
_read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
_mutexes[_write_buffer_index].unlock();
_write_buffer_index = next;
_index = 0;
return result;
}
您的代码存在一些问题。首先,修改原子变量时要小心。只有一小部分操作是真正原子的(见 http://en.cppreference.com/w/cpp/atomic/atomic),原子操作的组合不是原子的。考虑:
_read_buffer_index = (_read_buffer_index + 1) % _count;
这里发生的是你有一个变量的原子读取、一个增量、一个模运算和一个原子存储。但是,整个语句本身 不是 原子的!如果 _count
是 2 的幂,您可以只使用 ++
运算符。如果不是,则必须将 _read_buffer_index
读入临时变量,执行上述计算,然后使用 compare_exchange
函数存储新值 如果变量未更改与此同时。显然后者必须循环执行,直到成功。您还必须担心一个线程在第二个线程的读取和 compare_exchange 之间递增变量 _count
次的可能性,在这种情况下,第二个线程错误地认为变量没有改变。
第二个问题是缓存行弹跳。如果你在同一个缓存行上有多个互斥体,那么如果两个或多个线程试图同时访问它们,性能将会非常糟糕。缓存行的大小取决于您的平台。
主要问题是,虽然 ForceSwapBuffer()
和 Push()
都锁定了 _force_swap_buffer
互斥量,但 GetBuffer()
没有。 GetBuffer()
然而确实改变了 _read_buffer_index
。所以在 ForceSwapBuffer()
:
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_write_buffer_index != _read_buffer_index)
return nullptr;
// another thread can call GetBuffer() here and change _read_buffer_index
// rest of the code here
if
语句后 _write_buffer_index == _read_buffer_index
的假设实际上是无效的。