C++,Qt - 尽可能快地拆分 QByteArray
C++, Qt - splitting a QByteArray as fast as possible
我正在尝试拆分包含 UTF-8 编码纯文本(使用空格作为分隔符)的大量 QByteArray
,并尽可能获得最佳性能。我发现如果先将数组转换为 QString
可以获得更好的结果。我尝试使用正则表达式使用 QString.split
函数,但性能非常糟糕。事实证明这段代码更快:
QMutex mutex;
QSet<QString> split(QByteArray body)
{
QSet<QString> slova;
QString s_body = QTextCodec::codecForMib(106)->toUnicode(body);
QString current;
for(int i = 0; i< body.size(); i++){
if(s_body[i] == '\r' || s_body[i] == '\n' || s_body[i] == '\t' || s_body[i] == ' '){
mutex.lock();
slova.insert(current);
mutex.unlock();
current.clear();
current.reserve(40);
} else {
current.push_back(s_body[i]);
}
}
return slova;
}
"Slova" 目前是 QSet<QString>
,但我可以使用 std::set
或任何其他格式。这段代码应该找出数组中有多少个唯一的单词,并尽可能获得最佳性能。
遗憾的是,这段代码的运行速度远远不够快。我想从中挤出绝对最大值。
使用callgrind,我发现最贪吃的内部函数是:
QString::reallocData (18% absolute cost)
QString::append (10% absolute cost)
QString::operator= (8 % absolute cost)
QTextCodec::toUnicode (8% absolute cost)
显然,这与 push_back
函数的内存分配有关。解决这个问题的最佳方法是什么?不一定必须是 Qt 解决方案 - 纯 C 或 C++ 也是可以接受的。
如果我是你,我要做的第一件事就是修改你的代码,这样它就不会为它插入到 QSet 中的任何单词锁定和解锁 QMutex —— 这纯粹是开销。要么只在循环开始时锁定 QMutex 一次,然后在循环终止后再次解锁;或者更好的是,插入到任何其他线程都无法访问的 QSet,这样您根本不需要锁定任何 QMutex。
解决了这个问题后,要做的第二件事就是尽可能多地消除堆分配。理想情况下,您将执行整个解析,而根本不会分配或释放任何动态内存;我下面的实现就是这样做的(好吧,几乎 - unordered_set 可能 会做一些内部分配,但它可能不会)。在我的计算机(2.7GHz Mac Mini)上,我使用 Moby Dick 的古腾堡 ASCII 文本作为我的测试输入,测得处理速度约为每秒 1100 万字。
请注意,由于 UTF-8 使用的向后兼容编码,此程序在使用 UTF-8 或 ASCII 输入时同样可以正常工作。
#include <ctype.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <unordered_set>
// Loads in a text file from disk into an in-memory array
// Expected contents of the file are ASCII or UTF8 (doesn't matter which).
// Note that this function appends a space to the end of the returned array
// That way the parsing function doesn't have to include a special case
// since it is guaranteed that every word in the array ends with whitespace
static char * LoadFile(const char * fileName, unsigned long * retArraySizeBytes)
{
char * ret = NULL;
*retArraySizeBytes = 0;
FILE * fpIn = fopen(fileName, "r");
if (fpIn)
{
if (fseek(fpIn, 0L, SEEK_END) == 0)
{
const unsigned long fileSizeBytes = ftell(fpIn);
const unsigned long arraySizeBytes = *retArraySizeBytes = fileSizeBytes+1; // +1 because I'm going to append a space to the end
rewind(fpIn);
ret = new char[arraySizeBytes];
if (fread(ret, 1, fileSizeBytes, fpIn) == fileSizeBytes)
{
ret[fileSizeBytes] = ' '; // appending a space allows me to simplify the parsing step
}
else
{
perror("fread");
delete [] ret;
ret = NULL;
}
}
else perror("fseek");
fclose(fpIn);
}
return ret;
}
// Gotta provide our own equality-testing function otherwise unordered_set will just compare pointer values
struct CharPointersEqualityFunction : public std::binary_function<char *, char *,bool>
{
bool operator() (char * s1, char * s2) const {return strcmp(s1, s2) == 0;}
};
// Gotta provide our own hashing function otherwise unordered_set will just hash the pointer values
struct CharPointerHashFunction
{
int operator() (char * str) const
{
// djb2 by Dan Bernstein -- fast enough and simple enough
unsigned long hash = 5381;
int c; while((c = *str++) != 0) hash = ((hash << 5) + hash) + c;
return (int) hash;
}
};
typedef std::unordered_set<char *, CharPointerHashFunction, CharPointersEqualityFunction > CharPointerUnorderedSet;
int main(int argc, char ** argv)
{
if (argc < 2)
{
printf("Usage: ./split_words filename\n");
return 10;
}
unsigned long arraySizeBytes;
char * buf = LoadFile(argv[1], &arraySizeBytes);
if (buf == NULL)
{
printf("Unable to load input file [%s]\n", argv[1]);
return 10;
}
CharPointerUnorderedSet set;
set.reserve(100000); // trying to size (set) big enough that no reallocations will be necessary during the parse
struct timeval startTime;
gettimeofday(&startTime, NULL);
// The actual parsing of the text is done here
int wordCount = 0;
char * wordStart = buf;
char * wordEnd = buf;
char * bufEnd = &buf[arraySizeBytes];
while(wordEnd < bufEnd)
{
if (isspace(*wordEnd))
{
if (wordEnd > wordStart)
{
*wordEnd = '[=10=]';
set.insert(wordStart);
wordCount++;
}
wordStart = wordEnd+1;
}
wordEnd++;
}
struct timeval endTime;
gettimeofday(&endTime, NULL);
unsigned long long startTimeMicros = (((unsigned long long)startTime.tv_sec)*1000000) + startTime.tv_usec;
unsigned long long endTimeMicros = (((unsigned long long) endTime.tv_sec)*1000000) + endTime.tv_usec;
double secondsElapsed = ((double)(endTimeMicros-startTimeMicros))/1000000.0;
printf("Parsed %i words (%zu unique words) in %f seconds, aka %.0f words/second\n", wordCount, set.size(), secondsElapsed, wordCount/secondsElapsed);
//for (const auto& elem: set) printf("word=[%s]\n", elem);
delete [] buf;
return 0;
}
正如所怀疑的那样,您的最大成本是在 push_back
中导致频繁重新分配,因为您一次添加一个字符。为什么不提前搜索,然后使用 QString::mid()
:
一次追加所有数据
slova.insert(s_body.mid(beginPos, i - beginPos - 1));
其中 beginPos
保存当前子字符串开始的索引。不是在将每个字符插入 slova
之前将每个字符附加到 current
,而是一次性完成复制。复制子字符串后,向前搜索下一个 valid(不是分隔符)字符并将 beginPos
设置为等于该索引。
在(粗略的)代码中:
QString s_body = ...
//beginPos tells us the index of the current substring we are working
//with. -1 means the previous character was a separator
int beginPos = -1;
for (...) {
//basically your if statement provided in the question as a function
if (isSeparator(s_body[i])) {
//ignore double white spaces, etc.
if (beginPos != -1) {
mutex.lock();
slova.insert(s_body.mid(beginPos, i - beginPos - 1));
mutex.unlock();
}
} else if (beginPos == -1)
//if beginPos is not valid and we are not on a separator, we
//are at the start of a new substring.
beginPos = i;
}
这种方法将大大减少堆分配的开销并消除 QString::push_back()
次调用。
最后一点:QByteArray
还提供了 mid()
功能。您可以完全跳过到 QString
的转换并直接使用字节数组。
尽量减少您需要做的复制工作。将输入缓冲区保持为 UTF-8,并且不要在您的集合中存储 std::string
或 QString
;相反,创建一个小的 class 来引用现有的 UTF-8 数据:
#include <QString>
class stringref {
const char *start;
size_t length;
public:
stringref(const char *start, const char *end);
operator QString() const;
bool operator<(const stringref& other) const;
};
这可以封装UTF-8输入的一个子串。您需要确保它不会超过输入字符串;您可以通过巧妙地使用 std::shared_ptr
来做到这一点,但如果代码是相当独立的,那么它应该足够易于处理以推断生命周期。
我们可以将它从一对指针构造成我们的 UTF-8 数据,并在我们想要实际使用它时将其转换为 QString
:
stringref::stringref(const char *start, const char *end)
: start(start), length(end-start)
{}
stringref::operator QString() const
{
return QString::fromUtf8(start, length);
}
您需要定义 operator<
以便您可以在 std::set
.
中使用它
#include <cstring>
bool stringref::operator<(const stringref& other) const
{
return length == other.length
? std::strncmp(start, other.start, length) < 0
: length < other.length;
}
请注意,我们在取消引用指针之前按长度排序,以减少缓存影响。
现在我们可以编写split
方法:
#include <set>
#include <QByteArray>
std::set<stringref> split(const QByteArray& a)
{
std::set<stringref> words;
// start and end
const auto s = a.data(), e = s + a.length();
// current word
auto w = s;
for (auto p = s; p <= e; ++p) {
switch (*p) {
default: break;
case ' ': case '\r': case '\n': case '\t': case '[=13=]':
if (w != p)
words.insert({w, p});
w = p+1;
}
}
return words;
}
该算法几乎是您的算法,添加了 w!=p
测试,这样空白的运行就不会被计算在内。
让我们测试一下,并为重要的部分计时:
#include <QDebug>
#include <chrono>
int main()
{
QByteArray body{"foo bar baz\n foo again\nbar again "};
// make it a million times longer
for (int i = 0; i < 20; ++i)
body.append(body);
using namespace std::chrono;
const auto start = high_resolution_clock::now();
auto words = split(body);
const auto end = high_resolution_clock::now();
qDebug() << "Split"
<< body.length()
<< "bytes in"
<< duration_cast<duration<double>>(end - start).count()
<< "seconds";
for (auto&& word: words)
qDebug() << word;
}
我得到:
Split 35651584 bytes in 1.99142 seconds
"bar"
"baz"
"foo"
"again"
使用 -O3
编译将时间减少到 0.6188 秒,所以不要忘记向编译器求助!
如果这仍然不够快,可能是时候开始考虑任务的并行化了。您需要将字符串分成大致相等的长度,但要前进到下一个空白,这样就没有工作会跨越两个线程的工作量。每个线程都应该创建自己的结果集,然后归约步骤就是合并结果集。我不会为此提供完整的解决方案,因为这本身就是另一个问题。
我正在尝试拆分包含 UTF-8 编码纯文本(使用空格作为分隔符)的大量 QByteArray
,并尽可能获得最佳性能。我发现如果先将数组转换为 QString
可以获得更好的结果。我尝试使用正则表达式使用 QString.split
函数,但性能非常糟糕。事实证明这段代码更快:
QMutex mutex;
QSet<QString> split(QByteArray body)
{
QSet<QString> slova;
QString s_body = QTextCodec::codecForMib(106)->toUnicode(body);
QString current;
for(int i = 0; i< body.size(); i++){
if(s_body[i] == '\r' || s_body[i] == '\n' || s_body[i] == '\t' || s_body[i] == ' '){
mutex.lock();
slova.insert(current);
mutex.unlock();
current.clear();
current.reserve(40);
} else {
current.push_back(s_body[i]);
}
}
return slova;
}
"Slova" 目前是 QSet<QString>
,但我可以使用 std::set
或任何其他格式。这段代码应该找出数组中有多少个唯一的单词,并尽可能获得最佳性能。
遗憾的是,这段代码的运行速度远远不够快。我想从中挤出绝对最大值。
使用callgrind,我发现最贪吃的内部函数是:
QString::reallocData (18% absolute cost)
QString::append (10% absolute cost)
QString::operator= (8 % absolute cost)
QTextCodec::toUnicode (8% absolute cost)
显然,这与 push_back
函数的内存分配有关。解决这个问题的最佳方法是什么?不一定必须是 Qt 解决方案 - 纯 C 或 C++ 也是可以接受的。
如果我是你,我要做的第一件事就是修改你的代码,这样它就不会为它插入到 QSet 中的任何单词锁定和解锁 QMutex —— 这纯粹是开销。要么只在循环开始时锁定 QMutex 一次,然后在循环终止后再次解锁;或者更好的是,插入到任何其他线程都无法访问的 QSet,这样您根本不需要锁定任何 QMutex。
解决了这个问题后,要做的第二件事就是尽可能多地消除堆分配。理想情况下,您将执行整个解析,而根本不会分配或释放任何动态内存;我下面的实现就是这样做的(好吧,几乎 - unordered_set 可能 会做一些内部分配,但它可能不会)。在我的计算机(2.7GHz Mac Mini)上,我使用 Moby Dick 的古腾堡 ASCII 文本作为我的测试输入,测得处理速度约为每秒 1100 万字。
请注意,由于 UTF-8 使用的向后兼容编码,此程序在使用 UTF-8 或 ASCII 输入时同样可以正常工作。
#include <ctype.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <unordered_set>
// Loads in a text file from disk into an in-memory array
// Expected contents of the file are ASCII or UTF8 (doesn't matter which).
// Note that this function appends a space to the end of the returned array
// That way the parsing function doesn't have to include a special case
// since it is guaranteed that every word in the array ends with whitespace
static char * LoadFile(const char * fileName, unsigned long * retArraySizeBytes)
{
char * ret = NULL;
*retArraySizeBytes = 0;
FILE * fpIn = fopen(fileName, "r");
if (fpIn)
{
if (fseek(fpIn, 0L, SEEK_END) == 0)
{
const unsigned long fileSizeBytes = ftell(fpIn);
const unsigned long arraySizeBytes = *retArraySizeBytes = fileSizeBytes+1; // +1 because I'm going to append a space to the end
rewind(fpIn);
ret = new char[arraySizeBytes];
if (fread(ret, 1, fileSizeBytes, fpIn) == fileSizeBytes)
{
ret[fileSizeBytes] = ' '; // appending a space allows me to simplify the parsing step
}
else
{
perror("fread");
delete [] ret;
ret = NULL;
}
}
else perror("fseek");
fclose(fpIn);
}
return ret;
}
// Gotta provide our own equality-testing function otherwise unordered_set will just compare pointer values
struct CharPointersEqualityFunction : public std::binary_function<char *, char *,bool>
{
bool operator() (char * s1, char * s2) const {return strcmp(s1, s2) == 0;}
};
// Gotta provide our own hashing function otherwise unordered_set will just hash the pointer values
struct CharPointerHashFunction
{
int operator() (char * str) const
{
// djb2 by Dan Bernstein -- fast enough and simple enough
unsigned long hash = 5381;
int c; while((c = *str++) != 0) hash = ((hash << 5) + hash) + c;
return (int) hash;
}
};
typedef std::unordered_set<char *, CharPointerHashFunction, CharPointersEqualityFunction > CharPointerUnorderedSet;
int main(int argc, char ** argv)
{
if (argc < 2)
{
printf("Usage: ./split_words filename\n");
return 10;
}
unsigned long arraySizeBytes;
char * buf = LoadFile(argv[1], &arraySizeBytes);
if (buf == NULL)
{
printf("Unable to load input file [%s]\n", argv[1]);
return 10;
}
CharPointerUnorderedSet set;
set.reserve(100000); // trying to size (set) big enough that no reallocations will be necessary during the parse
struct timeval startTime;
gettimeofday(&startTime, NULL);
// The actual parsing of the text is done here
int wordCount = 0;
char * wordStart = buf;
char * wordEnd = buf;
char * bufEnd = &buf[arraySizeBytes];
while(wordEnd < bufEnd)
{
if (isspace(*wordEnd))
{
if (wordEnd > wordStart)
{
*wordEnd = '[=10=]';
set.insert(wordStart);
wordCount++;
}
wordStart = wordEnd+1;
}
wordEnd++;
}
struct timeval endTime;
gettimeofday(&endTime, NULL);
unsigned long long startTimeMicros = (((unsigned long long)startTime.tv_sec)*1000000) + startTime.tv_usec;
unsigned long long endTimeMicros = (((unsigned long long) endTime.tv_sec)*1000000) + endTime.tv_usec;
double secondsElapsed = ((double)(endTimeMicros-startTimeMicros))/1000000.0;
printf("Parsed %i words (%zu unique words) in %f seconds, aka %.0f words/second\n", wordCount, set.size(), secondsElapsed, wordCount/secondsElapsed);
//for (const auto& elem: set) printf("word=[%s]\n", elem);
delete [] buf;
return 0;
}
正如所怀疑的那样,您的最大成本是在 push_back
中导致频繁重新分配,因为您一次添加一个字符。为什么不提前搜索,然后使用 QString::mid()
:
slova.insert(s_body.mid(beginPos, i - beginPos - 1));
其中 beginPos
保存当前子字符串开始的索引。不是在将每个字符插入 slova
之前将每个字符附加到 current
,而是一次性完成复制。复制子字符串后,向前搜索下一个 valid(不是分隔符)字符并将 beginPos
设置为等于该索引。
在(粗略的)代码中:
QString s_body = ...
//beginPos tells us the index of the current substring we are working
//with. -1 means the previous character was a separator
int beginPos = -1;
for (...) {
//basically your if statement provided in the question as a function
if (isSeparator(s_body[i])) {
//ignore double white spaces, etc.
if (beginPos != -1) {
mutex.lock();
slova.insert(s_body.mid(beginPos, i - beginPos - 1));
mutex.unlock();
}
} else if (beginPos == -1)
//if beginPos is not valid and we are not on a separator, we
//are at the start of a new substring.
beginPos = i;
}
这种方法将大大减少堆分配的开销并消除 QString::push_back()
次调用。
最后一点:QByteArray
还提供了 mid()
功能。您可以完全跳过到 QString
的转换并直接使用字节数组。
尽量减少您需要做的复制工作。将输入缓冲区保持为 UTF-8,并且不要在您的集合中存储 std::string
或 QString
;相反,创建一个小的 class 来引用现有的 UTF-8 数据:
#include <QString>
class stringref {
const char *start;
size_t length;
public:
stringref(const char *start, const char *end);
operator QString() const;
bool operator<(const stringref& other) const;
};
这可以封装UTF-8输入的一个子串。您需要确保它不会超过输入字符串;您可以通过巧妙地使用 std::shared_ptr
来做到这一点,但如果代码是相当独立的,那么它应该足够易于处理以推断生命周期。
我们可以将它从一对指针构造成我们的 UTF-8 数据,并在我们想要实际使用它时将其转换为 QString
:
stringref::stringref(const char *start, const char *end)
: start(start), length(end-start)
{}
stringref::operator QString() const
{
return QString::fromUtf8(start, length);
}
您需要定义 operator<
以便您可以在 std::set
.
#include <cstring>
bool stringref::operator<(const stringref& other) const
{
return length == other.length
? std::strncmp(start, other.start, length) < 0
: length < other.length;
}
请注意,我们在取消引用指针之前按长度排序,以减少缓存影响。
现在我们可以编写split
方法:
#include <set>
#include <QByteArray>
std::set<stringref> split(const QByteArray& a)
{
std::set<stringref> words;
// start and end
const auto s = a.data(), e = s + a.length();
// current word
auto w = s;
for (auto p = s; p <= e; ++p) {
switch (*p) {
default: break;
case ' ': case '\r': case '\n': case '\t': case '[=13=]':
if (w != p)
words.insert({w, p});
w = p+1;
}
}
return words;
}
该算法几乎是您的算法,添加了 w!=p
测试,这样空白的运行就不会被计算在内。
让我们测试一下,并为重要的部分计时:
#include <QDebug>
#include <chrono>
int main()
{
QByteArray body{"foo bar baz\n foo again\nbar again "};
// make it a million times longer
for (int i = 0; i < 20; ++i)
body.append(body);
using namespace std::chrono;
const auto start = high_resolution_clock::now();
auto words = split(body);
const auto end = high_resolution_clock::now();
qDebug() << "Split"
<< body.length()
<< "bytes in"
<< duration_cast<duration<double>>(end - start).count()
<< "seconds";
for (auto&& word: words)
qDebug() << word;
}
我得到:
Split 35651584 bytes in 1.99142 seconds
"bar"
"baz"
"foo"
"again"
使用 -O3
编译将时间减少到 0.6188 秒,所以不要忘记向编译器求助!
如果这仍然不够快,可能是时候开始考虑任务的并行化了。您需要将字符串分成大致相等的长度,但要前进到下一个空白,这样就没有工作会跨越两个线程的工作量。每个线程都应该创建自己的结果集,然后归约步骤就是合并结果集。我不会为此提供完整的解决方案,因为这本身就是另一个问题。