Efficiently/concurrently 插入 unordered_map<>

Efficiently/concurrently insert into unordered_map<>

我需要使用以下算法(在 Python 中)为我的项目收集一些统计数据:

stats = defaultdict(list)
for s in L:
     current = []
     for q in L:
         stats[s, q] = copy(current)
         current = f(current, s, q)

因为列表 L 很大并且 f() 和复制 current 需要一些时间并且项目主要语言是 C++ 我决定选择 C++ 并使用它的多线程功能来实现我的算法.

我移动了那部分:

         stats[s, q] = copy(current)
         current = f(current, s, q)

到一个单独的 std::async,并在插入到 stats 时锁定 std::mutex,但这使事情变慢了。我尝试使用 tbb::concurrent_ordered_map,但这让事情变得更糟。

我编写了重现其行为的基准:https://gist.github.com/myaut/94ee59d9752f524d3da8

L 中 800 个条目的 2 x Xeon E5-2420 和 Debian 7 的结果:

single-threaded                       8157ms
async                mutex            10145ms
async with chunks    mutex            9618ms
threads              mutex            9378ms
async                tbb              10173ms
async with chunks    tbb              9560ms
threads              tbb              9418ms

我不明白为什么 TBB 是最慢的(似乎 tbb::concurrent_ordered_map 分配了更多的内存,但为了什么)。还有其他选项可以帮助我吗?

编辑:我已经用建议的方法更新了我的基准(并将 N 减少到 800)。看来问题出在其他地方...

EDIT2:我发现了其中一个问题 -- 应用程序消耗大量内存,因此 Xen Hypervisor 成为瓶颈 -- 在本机模式下重新启动,现在是多线程模式只比单线程慢一点

EDIT3:似乎多线程的问题是复制时的大量分配 list:

mprotect()
_int_malloc+0xcba/0x13f0
__libc_malloc+0x70/0x260
operator new(unsigned long)+0x1d/0x90
__gnu_cxx::new_allocator<int>::allocate(unsigned long, void const*)+0x40/0x42
std::_Vector_base<int, std::allocator<int> >::_M_allocate(unsigned long)+0x2f/0x38
std::_Vector_base<int, std::allocator<int> >::_M_create_storage(unsigned long)+0x23/0x58
std::_Vector_base<int, std::allocator<int> >::_Vector_base(unsigned long, std::allocator<int> const&)+0x3b/0x5e
std::vector<int, std::allocator<int> >::vector(std::vector<int, std::allocator<int> > const&)+0x55/0xf0
void threaded_process<concurrent_map_of_list_of_lists>(concurrent_map_of_list_of_lists&, std::vector<int, std::allocator<int> > const&)::{lambda(__gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, int)#1}::operator()(__gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, int) const+0x5f/0x1dc
_ZNSt12_Bind_simpleIFZ16threaded_processI31concurrent_map_of_list_of_listsEvRT_RKSt6vectorIiSaIiEEEUlN9__gnu_cxx17__normal_iteratorIPKiS6_EESD_iE_SD_SD_iEE9_M_invokeIJLm0ELm1ELm2EEEEvSt12_Index_tupleIJXspT_EEE+0x7c/0x87
std::_Bind_simple<void threaded_process<concurrent_map_of_list_of_lists>(concurrent_map_of_list_of_lists&, std::vector<int, std::allocator<int> > const&)::{lambda(__gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, int)#1} (__gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, int)>::operator()()+0x1b/0x28
std::thread::_Impl<std::_Bind_simple<void threaded_process<concurrent_map_of_list_of_lists>(concurrent_map_of_list_of_lists&, std::vector<int, std::allocator<int> > const&)::{lambda(__gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, int)#1} (__gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int const*, std::vector<int, std::allocator<int> > >, int)> >::_M_run()+0x1c/0x1e
std::error_code::default_error_condition() const+0x40/0xc0
start_thread+0xd0/0x300
clone+0x6d/0x90

问题是当堆 space 耗尽时,libc 调用 grow_heap(),通常只添加一页,但随后它调用 mprotect(),后者在内核中调用 validate_mm() . validate_mm() 似乎使用信号量锁定了整个 VMA。我用 tbb::scalable_allocator 替换了默认的 list 分配器,太棒了!现在 tbb 比单处理器方法快 2 倍。

感谢您的帮助,我将在 std::async.

中使用@Dave 方法处理大量工作

如果 f(current, s, q) 和复制 current 是微不足道的便宜,那么通过多线程进行扩展将很难值得开销。但是,我想我会使用无锁 hash/unordered 映射(tbb::concurrent_hash_map?我不知道待定)并使用 std::async 启动整个内部 for 循环。这个想法是用 std::async 启动一个体面的工作块,如果它太小而你启动一百万个琐碎的任务,那么使用 std::async 的开销将超过任务必须完成的工作!

另请注意,当您使用 std::async 时,您需要将返回的 future 保存在某处,否则它最终会阻塞,直到任务在 future 的析构函数中完成,为您购买多线程开销并且根本没有并行处理。您现在可能 运行 喜欢它。这非常令人讨厌,我希望它不是那样工作的。