使用匿名管道是否会为线程间通信引入内存障碍?
Does the use of an anonymous pipe introduce a memory barrier for interthread communication?
例如,假设我用new分配一个struct并将指针写入匿名管道的写端。
如果我从相应的读取端读取指针,我能保证看到结构上的'correct'内容吗?
同样有趣的是 socketpair() 在 unix 上的结果和在 windows 上通过 tcp 环回自连接的结果是否具有相同的保证。
上下文是一种服务器设计,它通过 select/epoll
集中事件调度
For example, say I allocate a struct with new and write the pointer into the write end of an anonymous pipe.
If I read the pointer from the corresponding read end, am I guaranteed to see the 'correct' contents on the struct?
没有。无法保证写入 CPU 会将写入从其缓存中清除并使其对可能进行读取的其他 CPU 可见。
Also of of interest is whether the results of socketpair() on unix & self connecting over tcp loopback on windows have the same guarantees.
没有
我相信,您的情况可能会减少到这个 2 线程模型:
int data = 0;
std::atomic<int*> atomicPtr{nullptr};
//...
void thread1()
{
data = 42;
atomicPtr.store(&integer, std::memory_order_release);
}
void thread2()
{
int* ptr = nullptr;
while(!ptr)
ptr = atomicPtr.load(std::memory_order_consume);
assert(*ptr == 42);
}
因为你有 2 个进程,你不能在它们之间使用一个原子变量,但是因为你列出了 windows 你可以从消费部分省略 atomicPtr.load(std::memory_order_consume)
因为,据我所知,所有架构 Windows 是 运行 保证这个加载是正确的,加载侧没有任何障碍。事实上,我认为没有多少架构可以让该指令不是 NO-OP(我只听说过 DEC Alpha)
实际上,调用 write()
,这是一个系统调用,最终会在内核中锁定一个或多个数据结构,这应该会处理重新排序问题。例如,POSIX 需要后续读取才能看到在调用之前写入的数据,这本身就意味着锁定(或某种 acquire/release)。
至于这是否是调用的正式规范的一部分,可能不是。
指针只是一个内存地址,所以如果您在同一个进程中,指针在接收线程上将是有效的并且指向相同的结构。如果你在不同的进程中,充其量你会立即得到一个内存错误,更糟糕的是你会读(或写)到一个随机内存,这本质上是未定义的行为。
你会阅读正确的内容吗?如果您的指针位于两个线程共享的静态变量中,既不会好也不会坏:如果您想要一致性,您仍然需要进行一些 同步 。
静态内存(由线程共享)、匿名管道、套接字对、tcp 环回等之间的传输地址类型是否重要?否:所有这些通道都传输 字节 ,因此如果您传递一个内存地址,您将获得您的内存地址。剩下的就是同步,因为这里只是共享一个内存地址。
如果你不使用任何其他同步,任何事情都可能发生(我是否已经谈到未定义的行为?):
- 读取线程可以在写入内存之前访问内存,方法是写入一个提供陈旧数据的线程
- 如果您忘记将结构成员声明为 volatile,读取线程可以继续使用缓存值,这里再次获取陈旧数据
- 读取线程可以读取部分写入的数据,这意味着数据不连贯
我同意 Serge Ballesta 的回答。在同一个进程内,通过匿名管道发送和接收对象地址是可行的。
由于write
系统调用在消息大小低于PIPE_BUF
(通常为4096字节)时保证是原子的,所以多生产者线程不会弄乱彼此的对象地址(8 64 位应用程序的字节数)。
话不多说,这里是Linux的演示代码(为简单起见省略了防御代码和错误处理程序)。只需复制并粘贴到 pipe_ipc_demo.cc
然后编译 & 运行 测试。
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <string>
#include <list>
template<class T> class MPSCQ { // pipe based Multi Producer Single Consumer Queue
public:
MPSCQ();
~MPSCQ();
int producerPush(const T* t);
T* consumerPoll(double timeout = 1.0);
private:
void _consumeFd();
int _selectFdConsumer(double timeout);
T* _popFront();
private:
int _fdProducer;
int _fdConsumer;
char* _consumerBuf;
std::string* _partial;
std::list<T*>* _list;
static const int _PTR_SIZE;
static const int _CONSUMER_BUF_SIZE;
};
template<class T> const int MPSCQ<T>::_PTR_SIZE = sizeof(void*);
template<class T> const int MPSCQ<T>::_CONSUMER_BUF_SIZE = 1024;
template<class T> MPSCQ<T>::MPSCQ() :
_fdProducer(-1),
_fdConsumer(-1) {
_consumerBuf = new char[_CONSUMER_BUF_SIZE];
_partial = new std::string; // for holding partial pointer address
_list = new std::list<T*>; // unconsumed T* cache
int fd_[2];
int r = pipe(fd_);
_fdConsumer = fd_[0];
_fdProducer = fd_[1];
}
template<class T> MPSCQ<T>::~MPSCQ() { /* omitted */ }
template<class T> int MPSCQ<T>::producerPush(const T* t) {
return t == NULL ? 0 : write(_fdProducer, &t, _PTR_SIZE);
}
template<class T> T* MPSCQ<T>::consumerPoll(double timeout) {
T* t = _popFront();
if (t != NULL) {
return t;
}
if (_selectFdConsumer(timeout) <= 0) { // timeout or error
return NULL;
}
_consumeFd();
return _popFront();
}
template<class T> void MPSCQ<T>::_consumeFd() {
memcpy(_consumerBuf, _partial->data(), _partial->length());
ssize_t r = read(_fdConsumer, _consumerBuf, _CONSUMER_BUF_SIZE - _partial->length());
if (r <= 0) { // EOF or error, error handler omitted
return;
}
const char* p = _consumerBuf;
int remaining_len_ = _partial->length() + r;
T* t;
while (remaining_len_ >= _PTR_SIZE) {
memcpy(&t, p, _PTR_SIZE);
_list->push_back(t);
remaining_len_ -= _PTR_SIZE;
p += _PTR_SIZE;
}
*_partial = std::string(p, remaining_len_);
}
template<class T> int MPSCQ<T>::_selectFdConsumer(double timeout) {
int r;
int nfds_ = _fdConsumer + 1;
fd_set readfds_;
struct timeval timeout_;
int64_t usec_ = timeout * 1000000.0;
while (true) {
timeout_.tv_sec = usec_ / 1000000;
timeout_.tv_usec = usec_ % 1000000;
FD_ZERO(&readfds_);
FD_SET(_fdConsumer, &readfds_);
r = select(nfds_, &readfds_, NULL, NULL, &timeout_);
if (r < 0 && errno == EINTR) {
continue;
}
return r;
}
}
template<class T> T* MPSCQ<T>::_popFront() {
if (!_list->empty()) {
T* t = _list->front();
_list->pop_front();
return t;
} else {
return NULL;
}
}
// = = = = = test code below = = = = =
#define _LOOP_CNT 5000000
#define _ONE_MILLION 1000000
#define _PRODUCER_THREAD_NUM 2
struct TestMsg { // all public
int _threadId;
int _msgId;
int64_t _val;
TestMsg(int thread_id, int msg_id, int64_t val) :
_threadId(thread_id),
_msgId(msg_id),
_val(val) { };
};
static MPSCQ<TestMsg> _QUEUE;
static int64_t _SUM = 0;
void* functor_producer(void* arg) {
int my_thr_id_ = pthread_self();
TestMsg* msg_;
for (int i = 0; i <= _LOOP_CNT; ++ i) {
if (i == _LOOP_CNT) {
msg_ = new TestMsg(my_thr_id_, i, -1);
} else {
msg_ = new TestMsg(my_thr_id_, i, i + 1);
}
_QUEUE.producerPush(msg_);
}
return NULL;
}
void* functor_consumer(void* arg) {
int msg_cnt_ = 0;
int stop_cnt_ = 0;
TestMsg* msg_;
while (true) {
if ((msg_ = _QUEUE.consumerPoll()) == NULL) {
continue;
}
int64_t val_ = msg_->_val;
delete msg_;
if (val_ <= 0) {
if ((++ stop_cnt_) >= _PRODUCER_THREAD_NUM) {
printf("All done, _SUM=%ld\n", _SUM);
break;
}
} else {
_SUM += val_;
if ((++ msg_cnt_) % _ONE_MILLION == 0) {
printf("msg_cnt_=%d, _SUM=%ld\n", msg_cnt_, _SUM);
}
}
}
return NULL;
}
int main(int argc, char* const* argv) {
pthread_t consumer_;
pthread_create(&consumer_, NULL, functor_consumer, NULL);
pthread_t producers_[_PRODUCER_THREAD_NUM];
for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) {
pthread_create(&producers_[i], NULL, functor_producer, NULL);
}
for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) {
pthread_join(producers_[i], NULL);
}
pthread_join(consumer_, NULL);
return 0;
}
这是测试结果(2 * sum(1..5000000) == (1 + 5000000) * 5000000 == 25000005000000
):
$ g++ -o pipe_ipc_demo pipe_ipc_demo.cc -lpthread
$ ./pipe_ipc_demo ## output may vary except for the final _SUM
msg_cnt_=1000000, _SUM=251244261289
msg_cnt_=2000000, _SUM=1000708879236
msg_cnt_=3000000, _SUM=2250159002500
msg_cnt_=4000000, _SUM=4000785160225
msg_cnt_=5000000, _SUM=6251640644676
msg_cnt_=6000000, _SUM=9003167062500
msg_cnt_=7000000, _SUM=12252615629881
msg_cnt_=8000000, _SUM=16002380952516
msg_cnt_=9000000, _SUM=20252025092401
msg_cnt_=10000000, _SUM=25000005000000
All done, _SUM=25000005000000
此处展示的技术用于我们的生产应用程序。一种典型的用法是消费者线程充当日志编写器,而工作线程几乎可以异步写入日志消息。是的,almost 意味着当管道已满时,有时写入线程可能会在 write()
中被阻塞,这是提供的可靠的拥塞控制功能通过 OS.
有趣的问题,到目前为止,Cornstalks 只有一个正确答案。
在同一个 (multi-threaded) 进程中,没有任何保证,因为指针和数据遵循不同的路径到达目的地。
隐式 acquire/release 保证不适用,因为结构数据不能通过缓存搭载在指针上,并且正式地你正在处理数据竞争。
但是,查看指针和结构数据本身如何到达第二个线程(分别通过管道和内存缓存),这种机制很有可能不会造成任何伤害。
将指针发送到对等线程需要 3 次系统调用(发送线程中的 write()
,接收线程中的 select()
和 read()
)这是(相对)昂贵的并且指针值可用
在接收线程中,结构数据可能早就到达了。
请注意,这只是一个观察,机制仍然不正确。
例如,假设我用new分配一个struct并将指针写入匿名管道的写端。
如果我从相应的读取端读取指针,我能保证看到结构上的'correct'内容吗?
同样有趣的是 socketpair() 在 unix 上的结果和在 windows 上通过 tcp 环回自连接的结果是否具有相同的保证。
上下文是一种服务器设计,它通过 select/epoll
集中事件调度For example, say I allocate a struct with new and write the pointer into the write end of an anonymous pipe.
If I read the pointer from the corresponding read end, am I guaranteed to see the 'correct' contents on the struct?
没有。无法保证写入 CPU 会将写入从其缓存中清除并使其对可能进行读取的其他 CPU 可见。
Also of of interest is whether the results of socketpair() on unix & self connecting over tcp loopback on windows have the same guarantees.
没有
我相信,您的情况可能会减少到这个 2 线程模型:
int data = 0;
std::atomic<int*> atomicPtr{nullptr};
//...
void thread1()
{
data = 42;
atomicPtr.store(&integer, std::memory_order_release);
}
void thread2()
{
int* ptr = nullptr;
while(!ptr)
ptr = atomicPtr.load(std::memory_order_consume);
assert(*ptr == 42);
}
因为你有 2 个进程,你不能在它们之间使用一个原子变量,但是因为你列出了 windows 你可以从消费部分省略 atomicPtr.load(std::memory_order_consume)
因为,据我所知,所有架构 Windows 是 运行 保证这个加载是正确的,加载侧没有任何障碍。事实上,我认为没有多少架构可以让该指令不是 NO-OP(我只听说过 DEC Alpha)
实际上,调用 write()
,这是一个系统调用,最终会在内核中锁定一个或多个数据结构,这应该会处理重新排序问题。例如,POSIX 需要后续读取才能看到在调用之前写入的数据,这本身就意味着锁定(或某种 acquire/release)。
至于这是否是调用的正式规范的一部分,可能不是。
指针只是一个内存地址,所以如果您在同一个进程中,指针在接收线程上将是有效的并且指向相同的结构。如果你在不同的进程中,充其量你会立即得到一个内存错误,更糟糕的是你会读(或写)到一个随机内存,这本质上是未定义的行为。
你会阅读正确的内容吗?如果您的指针位于两个线程共享的静态变量中,既不会好也不会坏:如果您想要一致性,您仍然需要进行一些 同步 。
静态内存(由线程共享)、匿名管道、套接字对、tcp 环回等之间的传输地址类型是否重要?否:所有这些通道都传输 字节 ,因此如果您传递一个内存地址,您将获得您的内存地址。剩下的就是同步,因为这里只是共享一个内存地址。
如果你不使用任何其他同步,任何事情都可能发生(我是否已经谈到未定义的行为?):
- 读取线程可以在写入内存之前访问内存,方法是写入一个提供陈旧数据的线程
- 如果您忘记将结构成员声明为 volatile,读取线程可以继续使用缓存值,这里再次获取陈旧数据
- 读取线程可以读取部分写入的数据,这意味着数据不连贯
我同意 Serge Ballesta 的回答。在同一个进程内,通过匿名管道发送和接收对象地址是可行的。
由于write
系统调用在消息大小低于PIPE_BUF
(通常为4096字节)时保证是原子的,所以多生产者线程不会弄乱彼此的对象地址(8 64 位应用程序的字节数)。
话不多说,这里是Linux的演示代码(为简单起见省略了防御代码和错误处理程序)。只需复制并粘贴到 pipe_ipc_demo.cc
然后编译 & 运行 测试。
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <string>
#include <list>
template<class T> class MPSCQ { // pipe based Multi Producer Single Consumer Queue
public:
MPSCQ();
~MPSCQ();
int producerPush(const T* t);
T* consumerPoll(double timeout = 1.0);
private:
void _consumeFd();
int _selectFdConsumer(double timeout);
T* _popFront();
private:
int _fdProducer;
int _fdConsumer;
char* _consumerBuf;
std::string* _partial;
std::list<T*>* _list;
static const int _PTR_SIZE;
static const int _CONSUMER_BUF_SIZE;
};
template<class T> const int MPSCQ<T>::_PTR_SIZE = sizeof(void*);
template<class T> const int MPSCQ<T>::_CONSUMER_BUF_SIZE = 1024;
template<class T> MPSCQ<T>::MPSCQ() :
_fdProducer(-1),
_fdConsumer(-1) {
_consumerBuf = new char[_CONSUMER_BUF_SIZE];
_partial = new std::string; // for holding partial pointer address
_list = new std::list<T*>; // unconsumed T* cache
int fd_[2];
int r = pipe(fd_);
_fdConsumer = fd_[0];
_fdProducer = fd_[1];
}
template<class T> MPSCQ<T>::~MPSCQ() { /* omitted */ }
template<class T> int MPSCQ<T>::producerPush(const T* t) {
return t == NULL ? 0 : write(_fdProducer, &t, _PTR_SIZE);
}
template<class T> T* MPSCQ<T>::consumerPoll(double timeout) {
T* t = _popFront();
if (t != NULL) {
return t;
}
if (_selectFdConsumer(timeout) <= 0) { // timeout or error
return NULL;
}
_consumeFd();
return _popFront();
}
template<class T> void MPSCQ<T>::_consumeFd() {
memcpy(_consumerBuf, _partial->data(), _partial->length());
ssize_t r = read(_fdConsumer, _consumerBuf, _CONSUMER_BUF_SIZE - _partial->length());
if (r <= 0) { // EOF or error, error handler omitted
return;
}
const char* p = _consumerBuf;
int remaining_len_ = _partial->length() + r;
T* t;
while (remaining_len_ >= _PTR_SIZE) {
memcpy(&t, p, _PTR_SIZE);
_list->push_back(t);
remaining_len_ -= _PTR_SIZE;
p += _PTR_SIZE;
}
*_partial = std::string(p, remaining_len_);
}
template<class T> int MPSCQ<T>::_selectFdConsumer(double timeout) {
int r;
int nfds_ = _fdConsumer + 1;
fd_set readfds_;
struct timeval timeout_;
int64_t usec_ = timeout * 1000000.0;
while (true) {
timeout_.tv_sec = usec_ / 1000000;
timeout_.tv_usec = usec_ % 1000000;
FD_ZERO(&readfds_);
FD_SET(_fdConsumer, &readfds_);
r = select(nfds_, &readfds_, NULL, NULL, &timeout_);
if (r < 0 && errno == EINTR) {
continue;
}
return r;
}
}
template<class T> T* MPSCQ<T>::_popFront() {
if (!_list->empty()) {
T* t = _list->front();
_list->pop_front();
return t;
} else {
return NULL;
}
}
// = = = = = test code below = = = = =
#define _LOOP_CNT 5000000
#define _ONE_MILLION 1000000
#define _PRODUCER_THREAD_NUM 2
struct TestMsg { // all public
int _threadId;
int _msgId;
int64_t _val;
TestMsg(int thread_id, int msg_id, int64_t val) :
_threadId(thread_id),
_msgId(msg_id),
_val(val) { };
};
static MPSCQ<TestMsg> _QUEUE;
static int64_t _SUM = 0;
void* functor_producer(void* arg) {
int my_thr_id_ = pthread_self();
TestMsg* msg_;
for (int i = 0; i <= _LOOP_CNT; ++ i) {
if (i == _LOOP_CNT) {
msg_ = new TestMsg(my_thr_id_, i, -1);
} else {
msg_ = new TestMsg(my_thr_id_, i, i + 1);
}
_QUEUE.producerPush(msg_);
}
return NULL;
}
void* functor_consumer(void* arg) {
int msg_cnt_ = 0;
int stop_cnt_ = 0;
TestMsg* msg_;
while (true) {
if ((msg_ = _QUEUE.consumerPoll()) == NULL) {
continue;
}
int64_t val_ = msg_->_val;
delete msg_;
if (val_ <= 0) {
if ((++ stop_cnt_) >= _PRODUCER_THREAD_NUM) {
printf("All done, _SUM=%ld\n", _SUM);
break;
}
} else {
_SUM += val_;
if ((++ msg_cnt_) % _ONE_MILLION == 0) {
printf("msg_cnt_=%d, _SUM=%ld\n", msg_cnt_, _SUM);
}
}
}
return NULL;
}
int main(int argc, char* const* argv) {
pthread_t consumer_;
pthread_create(&consumer_, NULL, functor_consumer, NULL);
pthread_t producers_[_PRODUCER_THREAD_NUM];
for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) {
pthread_create(&producers_[i], NULL, functor_producer, NULL);
}
for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) {
pthread_join(producers_[i], NULL);
}
pthread_join(consumer_, NULL);
return 0;
}
这是测试结果(2 * sum(1..5000000) == (1 + 5000000) * 5000000 == 25000005000000
):
$ g++ -o pipe_ipc_demo pipe_ipc_demo.cc -lpthread
$ ./pipe_ipc_demo ## output may vary except for the final _SUM
msg_cnt_=1000000, _SUM=251244261289
msg_cnt_=2000000, _SUM=1000708879236
msg_cnt_=3000000, _SUM=2250159002500
msg_cnt_=4000000, _SUM=4000785160225
msg_cnt_=5000000, _SUM=6251640644676
msg_cnt_=6000000, _SUM=9003167062500
msg_cnt_=7000000, _SUM=12252615629881
msg_cnt_=8000000, _SUM=16002380952516
msg_cnt_=9000000, _SUM=20252025092401
msg_cnt_=10000000, _SUM=25000005000000
All done, _SUM=25000005000000
此处展示的技术用于我们的生产应用程序。一种典型的用法是消费者线程充当日志编写器,而工作线程几乎可以异步写入日志消息。是的,almost 意味着当管道已满时,有时写入线程可能会在 write()
中被阻塞,这是提供的可靠的拥塞控制功能通过 OS.
有趣的问题,到目前为止,Cornstalks 只有一个正确答案。
在同一个 (multi-threaded) 进程中,没有任何保证,因为指针和数据遵循不同的路径到达目的地。 隐式 acquire/release 保证不适用,因为结构数据不能通过缓存搭载在指针上,并且正式地你正在处理数据竞争。
但是,查看指针和结构数据本身如何到达第二个线程(分别通过管道和内存缓存),这种机制很有可能不会造成任何伤害。
将指针发送到对等线程需要 3 次系统调用(发送线程中的 write()
,接收线程中的 select()
和 read()
)这是(相对)昂贵的并且指针值可用
在接收线程中,结构数据可能早就到达了。
请注意,这只是一个观察,机制仍然不正确。