R并行写入SEXP结构
R parallel write SEXP structure
我正在使用 C/C++ 代码开发 R 中的数据处理模块,主要是出于速度原因。这是我的问题的事实列表。
- 最终结果数据是一个字符串向量列表,占用 20MB 到 200MB 的内存。
- 数据处理可以适应single-producer/multiple-consumer模型。
- 我的数据
wrap
将 vector<vector<string> >
转换为 List
花费了大量时间。
因此我打算直接在 SEXP 结构中工作,这样我可能会节省最终转换的时间。我的主要功能如下所示。
boost::atomic<bool> done(false);
SEXP myfun(...) {
...
SEXP sdataStr;
PROTECT(sdataStr=allocVector(VECSXP, nElem));
vector<SEXP> dataStr(nElem);
for (int i=0; i<nElem; ++i) {
dataStr[i]=SET_VECTOR_ELT(sdataStr, i, allocVector(STRSXP, n));
}
Producer producer(&queue);
Consumer consumer1(dataStr, nElem, &queue);
Consumer consumer2(dataStr, nElem, &queue);
boost::thread produce(producer);
boost::thread consume1(consumer1);
boost::thread consume2(consumer2);
produce.join();
done=true;
consume1.join();
consume2.join();
UNPROTECT(1);
return sdataStr;
}
我的消费者class看起来像这样
class Consumer {
vector<SEXP>& m_dataStr;
boost::lockfree::queue<buffer>* m_queue;
buffer m_buffer;
public:
Consumer(vector<SEXP>& dataStr, boost::lockfree::queue<buffer>* queue) : m_dataStr(dataStr), m_queue(queue) {}
void operator()() {
while (!done) {
while (m_queue->pop(m_buffer)) {
process_item();
}
}
while (m_queue->pop(m_buffer)) {
process_item();
}
}
private:
process_item() {
...
// for some 0<=idx<nElem, 0<=i<n, some char* f and integer len
SET_STRING_ELT(m_dataStr[idx], i, mkCharLen(f,len));
...
}
}
这些是我唯一使用 Rinternals 的地方。程序的逻辑保证了不同线程写入同一个地方永远不会发生,即 idx
和 i
组合在 Consumer
class 中最多只能发生一次。我遇到了各种奇怪的问题,比如"stack imbalance",或者"snapping into wrong generation",等等。我有什么遗漏吗?或者不建议多线程调用SET_STRING_ELT?非常感谢!
C/R API 函数不应在线程中调用,除非您知道自己在做什么,例如 mkCharLen
可能会修改使用的内部散列 table对于所有 R 字符串,因此您不能在线程中调用它。 SET_STRING_ELT
也可能在线程中不可用,尤其是在写屏障打开的情况下。
我正在使用 C/C++ 代码开发 R 中的数据处理模块,主要是出于速度原因。这是我的问题的事实列表。
- 最终结果数据是一个字符串向量列表,占用 20MB 到 200MB 的内存。
- 数据处理可以适应single-producer/multiple-consumer模型。
- 我的数据
wrap
将vector<vector<string> >
转换为List
花费了大量时间。
因此我打算直接在 SEXP 结构中工作,这样我可能会节省最终转换的时间。我的主要功能如下所示。
boost::atomic<bool> done(false);
SEXP myfun(...) {
...
SEXP sdataStr;
PROTECT(sdataStr=allocVector(VECSXP, nElem));
vector<SEXP> dataStr(nElem);
for (int i=0; i<nElem; ++i) {
dataStr[i]=SET_VECTOR_ELT(sdataStr, i, allocVector(STRSXP, n));
}
Producer producer(&queue);
Consumer consumer1(dataStr, nElem, &queue);
Consumer consumer2(dataStr, nElem, &queue);
boost::thread produce(producer);
boost::thread consume1(consumer1);
boost::thread consume2(consumer2);
produce.join();
done=true;
consume1.join();
consume2.join();
UNPROTECT(1);
return sdataStr;
}
我的消费者class看起来像这样
class Consumer {
vector<SEXP>& m_dataStr;
boost::lockfree::queue<buffer>* m_queue;
buffer m_buffer;
public:
Consumer(vector<SEXP>& dataStr, boost::lockfree::queue<buffer>* queue) : m_dataStr(dataStr), m_queue(queue) {}
void operator()() {
while (!done) {
while (m_queue->pop(m_buffer)) {
process_item();
}
}
while (m_queue->pop(m_buffer)) {
process_item();
}
}
private:
process_item() {
...
// for some 0<=idx<nElem, 0<=i<n, some char* f and integer len
SET_STRING_ELT(m_dataStr[idx], i, mkCharLen(f,len));
...
}
}
这些是我唯一使用 Rinternals 的地方。程序的逻辑保证了不同线程写入同一个地方永远不会发生,即 idx
和 i
组合在 Consumer
class 中最多只能发生一次。我遇到了各种奇怪的问题,比如"stack imbalance",或者"snapping into wrong generation",等等。我有什么遗漏吗?或者不建议多线程调用SET_STRING_ELT?非常感谢!
C/R API 函数不应在线程中调用,除非您知道自己在做什么,例如 mkCharLen
可能会修改使用的内部散列 table对于所有 R 字符串,因此您不能在线程中调用它。 SET_STRING_ELT
也可能在线程中不可用,尤其是在写屏障打开的情况下。