在线程中使用 libarchive 读取存档内容时发生数据争用。我做错了什么?
Data race while reading contents of archive with libarchive in thread. What I did wrong?
我正在尝试并行读取和处理已在内存中的存档,但在线程中调用 libarchive 函数时出现数据争用。 Google sanitizer 说问题出在 get_archive_contens 中调用 archive_read_open_memory 函数。但我读到 libarchive 中的所有函数都应该是线程安全的。谁能告诉我我做错了什么?
这是我的线程代码。
void indexing_thread(std::mutex &m,
int ¤t_indexing_threads, concurrent_queue<std::pair<std::string, std::string>> &raw_files_q,
concurrent_queue<std::map<std::string, size_t>> &words_q) {
while (true) {
auto raw_file = raw_files_q.front();
std::string file_buffer = raw_file.first;
std::string ext = raw_file.second;
if (file_buffer.empty() && ext.empty()) {
break;
}
raw_files_q.pop();
std::string file_content;
if (ext == ".zip") {
auto archive_contents = get_archive_content(file_buffer);
for (int i = 0; i < archive_contents.size(); ++i) {
auto cur_ext = boost::filesystem::extension(archive_contents[i]);
if (cur_ext == ".txt") {
file_content = get_archive_file_contents(archive_contents[i], archive_contents, file_buffer);
file_content = convert_to_normalized_utf_string(file_content);
}
}
for (int i = 0; i < archive_contents.size(); ++i) {
auto cur_ext = boost::filesystem::extension(archive_contents[i]);
if (cur_ext == ".txt") {
file_content = get_archive_file_contents(archive_contents[i], archive_contents, file_buffer);
file_content = convert_to_normalized_utf_string(file_content);
}
}
}
auto words = word_count_map_nonparallel(file_content);
words_q.push_back(words);
}
m.lock();
current_indexing_threads--;
if (current_indexing_threads == 0) {
words_q.push_back(std::map<std::string, size_t>{});
}
m.unlock();
}
get_archive_content代码:
std::string
get_archive_file_contents(const std::string &filename, std::vector<std::string> contents, std::string file_buffer) {
if (std::find(contents.begin(), contents.end(), filename) == contents.end()) {
throw FileDoesNotExistsException(filename);
}
struct archive_entry *entry;
struct archive *archive = archive_read_new();
archive_read_support_filter_all(archive);
archive_read_support_format_all(archive);
archive_read_support_format_raw(archive);
archive_read_support_format_empty(archive);
int reading_result = archive_read_open_memory(archive, file_buffer.data(), file_buffer.size());
if (reading_result != 0) {
throw std::runtime_error("Error reading archive");
}
void *buf;
int64_t length;
while (archive_read_next_header(archive, &entry) == ARCHIVE_OK) {
if (archive_entry_filetype(entry) == AE_IFREG) {
length = archive_entry_size(entry);
buf = malloc(length);
if (!buf) {
archive_read_data_skip(archive);
continue;
}
archive_read_data(archive, buf, length);
break;
}
}
std::string result = static_cast<char *>(buf);
return result;
}
UPD:Google 线程清理程序报告
我解决了。问题出在二进制文件中,它是从 ubuntu 存储库安装的。我从源代码安装 libarchive 解决了这个问题。
我正在尝试并行读取和处理已在内存中的存档,但在线程中调用 libarchive 函数时出现数据争用。 Google sanitizer 说问题出在 get_archive_contens 中调用 archive_read_open_memory 函数。但我读到 libarchive 中的所有函数都应该是线程安全的。谁能告诉我我做错了什么? 这是我的线程代码。
void indexing_thread(std::mutex &m,
int ¤t_indexing_threads, concurrent_queue<std::pair<std::string, std::string>> &raw_files_q,
concurrent_queue<std::map<std::string, size_t>> &words_q) {
while (true) {
auto raw_file = raw_files_q.front();
std::string file_buffer = raw_file.first;
std::string ext = raw_file.second;
if (file_buffer.empty() && ext.empty()) {
break;
}
raw_files_q.pop();
std::string file_content;
if (ext == ".zip") {
auto archive_contents = get_archive_content(file_buffer);
for (int i = 0; i < archive_contents.size(); ++i) {
auto cur_ext = boost::filesystem::extension(archive_contents[i]);
if (cur_ext == ".txt") {
file_content = get_archive_file_contents(archive_contents[i], archive_contents, file_buffer);
file_content = convert_to_normalized_utf_string(file_content);
}
}
for (int i = 0; i < archive_contents.size(); ++i) {
auto cur_ext = boost::filesystem::extension(archive_contents[i]);
if (cur_ext == ".txt") {
file_content = get_archive_file_contents(archive_contents[i], archive_contents, file_buffer);
file_content = convert_to_normalized_utf_string(file_content);
}
}
}
auto words = word_count_map_nonparallel(file_content);
words_q.push_back(words);
}
m.lock();
current_indexing_threads--;
if (current_indexing_threads == 0) {
words_q.push_back(std::map<std::string, size_t>{});
}
m.unlock();
}
get_archive_content代码:
std::string
get_archive_file_contents(const std::string &filename, std::vector<std::string> contents, std::string file_buffer) {
if (std::find(contents.begin(), contents.end(), filename) == contents.end()) {
throw FileDoesNotExistsException(filename);
}
struct archive_entry *entry;
struct archive *archive = archive_read_new();
archive_read_support_filter_all(archive);
archive_read_support_format_all(archive);
archive_read_support_format_raw(archive);
archive_read_support_format_empty(archive);
int reading_result = archive_read_open_memory(archive, file_buffer.data(), file_buffer.size());
if (reading_result != 0) {
throw std::runtime_error("Error reading archive");
}
void *buf;
int64_t length;
while (archive_read_next_header(archive, &entry) == ARCHIVE_OK) {
if (archive_entry_filetype(entry) == AE_IFREG) {
length = archive_entry_size(entry);
buf = malloc(length);
if (!buf) {
archive_read_data_skip(archive);
continue;
}
archive_read_data(archive, buf, length);
break;
}
}
std::string result = static_cast<char *>(buf);
return result;
}
UPD:Google 线程清理程序报告
我解决了。问题出在二进制文件中,它是从 ubuntu 存储库安装的。我从源代码安装 libarchive 解决了这个问题。