如何在没有内存泄漏的情况下同时(线程安全)填充 c++11 std::map<std::string,std::bitset<N>*>?

How to populate c++11 std::map<std::string,std::bitset<N>*> concurrently (thread-safe) without memory leakage?

基本上,我需要从同时读取的数千个文件中填充数百万个关键条目(或多或少 5000 万个订单)的 std::map。这些键将指向的值将从堆中分配(std::bitset 类型)。

std::map<std::string,std::bitset<BITSET_SIZE>*> my_map;
  1. 我的第一个问题是:我不想要两个线程(首先检查一个 键存在,如果不存在,) 从堆中分配 space。 因为我只能持有一个指针,而其他分配会 导致内存泄漏,因为我无法跟踪它们。

    //count should be thread-safe, since it's defined as const in <map> header file
    if(my_map.count(key) == 0){
        //some other thread may have initialized the key in the mean time
        my_map[key] = new std::bitset<BITSET_SIZE>();
        //Now I will lose the pointer to previous heap allocation from other thread
    }
    

    一个解决方案是有一些互斥机制,比如 boost::unique_lockboost::shared_lock 的一些巧妙组合 boost::unique_lock 为了性能,我很乐意听取您的想法。

  2. 假设我已经完成了第一部分,意思是;在没有内存泄漏的情况下同时初始化 my_map 的键,任务的第二部分是同时操作值 (std::bitset)。为此,我认为应该没有任何问题,因为根据我的设置,可以保证不会有两个线程同时处理同一个键。 (任何线程都不会为 my_map 的键添加新键或从底层树结构中删除键)

首先 - const 函数不是线程安全的。考虑:

struct A {
  int q;
  void set(int qq) { q = qq; }
  int get() const { return q; }
};

get() 不是线程安全的 - 可能会调用其他线程集,这将修改 q。如果你想要线程安全,你必须使用原子结构锁定或更新(还有其他多线程问题,如果你不锁定/使用原子会发生,但这些不在你的问题范围内 - 你绝对需要那些!)。

现在开始解决: 由于您需要显式同步对地图结构的访问,因此您的问题不再是问题:

std::mutex m; // since c++11
...
{
    std::lock_guard _l(m); // since c++11
    if (!my_map.emplace(key, bitset_ptr).second)
        delete bitset_ptr;
}

这将使用键 key 和值 bitset_ptr 将元素插入 my_map ,但前提是它不存在。它将 return 两个元素的元组 - 第一个是创建的元素和先前存在的元素的迭代器,第二个是布尔标志,如果元素已创建则为真,如果元素之前存在则为假。因此,如果元素已经插入并且没有内存泄漏,您只需删除 bitset_ptr 即可。请注意,由于同步量,这可能会很慢。

更新: 你显然需要使用互斥锁来同步对 my_map 的任何访问,只要你在多个线程中保持更新。

更新 2: op 已经尝试了最简单的解决方案,发现速度不够快。让我们更深入一点。 (注意:最佳做法是衡量应用程序的性能并找到代码花费大部分时间的地方,但我不能那样做;))。很少有 "obvious"(阅读:可能)导致减速的原因:

  • 插入地图 - 插入排序地图具有 O(ln n) 运行时性能,实际上可能由于缓存不匹配而非常慢。在平均 100 万个元素映射中,您需要与 10 个不同的字符串进行比较,这些字符串(可能)位于完全不同的内存区域,因此始终从处理器缓存中清除彼此。
  • 从文件中读取 - 从多个文件中读取小块可能(也可能不会!)对整体速度不利。
  • 多重分配 - 通常内存分配很慢。此外,大量分配会增加内存碎片并减少局部性。
  • 同步锁定 - 这对任何事情都不健康...

我假设,您不能轻易地(便宜地)确定单个文件中元素的数量和总数。先上代码:

using etype = pair<string, bitset<N>*>;
vector<etype> all_elements;
mutex all_elements_mutex;
void parse_single_file_in_thread(...) {
    vector<etype> tmp;
    for(auto element : parse_element_from_file()) 
        tmp.push_back(move(element));
    lock_guard _l(all_elements_mutex);
    for(auto &a : tmp) all_elements.push_back(move(a));
}
map<string, bitset<N>*> parse_all_files() {
    // create threads, parse files in them and wait for them to finish
    std::sort(all_elements.begin(), all_elements.end(), 
        [](const etype &a, const etype &b) { return a.first < b.first; });
    map<string, bitset<N>*> tmp;
    for(auto &a : all_elements) if (!tmp.insert(tmp.end(), etype(move(a.first), a.second)).second) delete a.second;
    all_elements.clear();
    return tmp;
}

它的作用是: - 首先将键插入向量(忽略重复检查),稍后将对其进行排序并插入带有放置提示的地图(它们已排序,因此我们始终知道插入下一个元素的正确位置 - 地图的末尾),这是比直接插入地图快得多 - 每个文件的项目都首先放入它自己的向量中,并在解析整个文件后移动到全局向量中,这最大限度地减少了锁定

这应该足以提高性能。接下来的事情是用其他东西替换字符串以避免如此多的字符串到字符串排序比较。但这很容易超出范围。 ;)

注意:我是凭记忆写的整个代码,所以它可能无法编译并且可能需要 c++17。

conststd:: 容器(如 map)的访问保证在不同线程中是合法的,即使没有同步。

任何不同步的非const访问都会使任何其他访问(const或非const)非法(程序行为未定义)。

有些操作不是const,但就同步而言是const。例如,非常量 find 被视为 "as const",[]vector.

上也是如此 地图上的

[] 不是 const,也不被视为 const。我不确定不创建元素的 [] 是否被视为 const,我将不得不仔细检查标准。由于 find 存在并通过定义明确的语义解决了相同的问题,因此我无论如何都不会在代码中使用它。

const并不意味着线程安全,它意味着与其他const操作线程安全.线程安全是两个或多个代码位之间的关系,它不是绝对的。因此在其他人插入时调用 .count 是不合法的。

一般来说,共享是线程安全的祸根。解决此问题的更简单方法是为每个 "task" 提供自己的 map 以供使用。然后将这些 map 合并回主地图。

合并的复杂程度和发生频率取决于具体的应用程序以及有多少复制。

最简单的解决方案是:

std::map<std::string, std::unique_ptr<std::bitset<BITSET_SIZE>>>
parse_file( some_file_handle );

然后

std::map<std::string, std::unique_ptr<std::bitset<BITSET_SIZE>>>
parse_files( gsl::span<some_file_handle> handles ) {
  if (handles.size()==0) return {};
  if (handles.size()==1) return parse_file(handles.front());
  auto lhs = parse_files( handles.first(handles.size()/2) );
  auto rhs = parse_files( handles.last(handles.size()-handles.size()/2) );
  return merge_maps(std::move(lhs), std::move(rhs));
}

为我们提供了单线程版本。我们通过以下方式对其进行多线程处理:

std::map<std::string, std::unique_ptr<std::bitset<BITSET_SIZE>>>
parse_files( gsl::span<some_file_handle> handles, executor exec ) {
  if (handles.size()==0) return {};
  if (handles.size()==1) return parse_file(handles.front());
  auto lhs = exec( [handles]{parse_files(handles.first(handles.size()/2) )} );
  auto rhs = exec( [handles]{parse_files(handles.last(handles.size()-handles.size()/2) )} );
  auto retval = exec( [lhs=std::move(lhs], rhs=std::move[rhs]]()mutable{
    return merge_maps(std::move(lhs).get(), std::move(rhs).get() );
  }
  return std::move(retval).get();
}

其中 executor 接受类型为 T() 和 return 的对象 future<T>。天真的执行者只是 运行 函数和 return 一个现成的未来。更高级的执行者使用 std::async 将其关闭。一个更高级的使用线程池,当等待时使用等待线程 运行 任务,如果它还没有 运行ning.

现在,像 ppl 或 Intel 的 TBB 这样的并发库提供了很容易做到这一点的方法。