使用 boost::iostreams::mapped_file_source 和 std::multimap

Using boost::iostreams::mapped_file_source with std::multimap

我要分析的数据量很大 - 每个文件大约有 5gigs。每个文件的格式如下:

xxxxx yyyyy

key和value都可以重复,但是key是按升序排列的。为此,我正在尝试使用内存映射文件,然后找到所需的键并使用它们。这是我写的:

if (data_file != "")
{
    clock_start = clock();
    data_file_mapped.open(data_file);

    data_multimap = (std::multimap<double, unsigned int> *)data_file_mapped.data();
    if (data_multimap != NULL)
    {
        std::multimap<double, unsigned int>::iterator it = data_multimap->find(keys_to_find[4]);
        if (it != data_multimap->end())
        {
            std::cout << "Element found.";
            for (std::multimap<double, unsigned int>::iterator it = data_multimap->lower_bound(keys_to_find[4]); it != data_multimap->upper_bound(keys_to_find[5]); ++it)
            {
                std::cout << it->second;
            }
            std::cout << "\n";
            clock_end = clock();

            std::cout << "Time taken to read in the file: " << (clock_end - clock_start)/CLOCKS_PER_SEC << "\n";
        }
        else
            std::cerr << "Element not found at all" << "\n";
    }
    else
        std::cerr << "Nope - no data received."<< "\n";
}

基本上,我需要定位键的范围并将这些块拉出来进行处理。我第一次尝试在多图上使用方法时遇到段错误。例如,当 find 方法被调用时。 upper_boundlower_bound等方法也试过了,还是报错。

这就是 gdb 给我的:

Program received signal SIGSEGV, Segmentation fault.
_M_lower_bound (this=<optimized out>, __k=<optimized out>, __y=<optimized out>, __x=0xa31202030303833) at /usr/include/c++/4.9.2/bits/stl_tree.h:1261
1261            if (!_M_impl._M_key_compare(_S_key(__x), __k))

有人可以指出我做错了什么吗?我只能找到关于内存映射文件的简单示例 - 还没有这样的。谢谢。

编辑:更多信息:

我上面描述的文件基本上是一个两列的纯文本文件,神经模拟器给我作为我的模拟输出。很简单:

$ du -hsc 201501271755.e.ras 
4.9G    201501271755.e.ras
4.9G    total
$ head 201501271755.e.ras 
0.013800  0
0.013800  1
0.013800  10
0.013800  11
0.013800  12
0.013800  13
0.013800  14
0.013800  15
0.013800  16
0.013800  17

第一列是时间,第二列是此时激发的神经元——(这是一个尖峰时间光栅文件)。实际上,输出是来自每个 MPI 等级的文件,用于 运行 模拟。使用 sort -g -m 将各种文件合并到这个主文件中。有关文件格式的更多信息,请参见:http://www.fzenke.net/auryn/doku.php?id=manual:ras

为了计算在模拟的特定时间设置的神经元的放电率和其他指标,我需要 - 在文件中找到时间,在 [time -1,time] 和 [= 之间拉出一个块44=] 这个块上的一些指标等等。这个文件非常小,我希望随着我的模拟变得越来越复杂并且 运行 持续更长的时间,大小会增加很多。这就是我开始研究内存映射文件的原因。我希望这能在一定程度上澄清问题陈述。我只需要读取输出文件来处理它包含的信息。我根本不需要修改这个文件。

为了处理数据,我将再次使用多线程,但由于我在地图上的所有操作都是只读的,所以我不希望 运行 在那里遇到麻烦。

data_multimap = (std::multimap<double, unsigned int> *)data_file_mapped.data();,据我从 boost documentation 中读到的,你误解了那个函数,那个转换不起作用,你需要用 [提供的 char* 填充 multimap =11=]

我编辑添加更详细的内容,例如映射后,你可以做

std::getline(data_file_mapped, oneString);

然后,在线传递内容(您可以使用字符串流来完成该任务)并填充您的多图。

重复该过程直到文件结束。

多张地图未按顺序排列在内存中。 (它们是基于节点的容器,但我离题了)。事实上,即使是,布局与文本输入匹配的可能性也很小。

基本上有两种方法可以完成这项工作:

  1. 继续使用 multimap 但使用自定义分配器(以便所有分配都在映射的内存区域中完成)。从高级 C++ 的角度来看,这是“最好的”,/但是/您需要将文件更改为二进制格式。

如果可以的话,这就是我的建议。 Boost Container + Boost Interprocess 拥有让这一切变得相对轻松所需的一切。

  1. 您编写了一个直接作用于映射数据的自定义容器“抽象”。你可以

    • 从任何地方识别“xxxx yyyy”对(行结束?)或
    • 为文件中所有行的开头建立索引。

使用这些,您可以设计一个交互器(Boost Iterator iterator_facade),您可以使用它来实现更高级别的操作(lower_boundupper_boundequal_range)。

有了这些之后,您基本上就可以将此内存映射作为只读键值数据库进行查询了。

遗憾的是,如果您还想支持变异操作(insertremove),这种内存表示形式 对性能非常不利

如果您有文件的实际示例,我可以演示所描述的任何一种方法。

更新

快速样品:

  1. 使用 boost::interprocess 你可以(非常)简单地定义你想要的多图:

    namespace shared {
        namespace bc = boost::container;
    
        template <typename T> using allocator = bip::allocator<T, bip::managed_mapped_file::segment_manager>;
        template <typename K, typename V>
            using multi_map = bc::flat_multimap<
                K, V, std::less<K>, 
                allocator<typename bc::flat_multimap<K, V>::value_type> >;
    }
    

    备注:

    • 我选择了flatmap(实际上是flat_multimap)因为它可能更多 存储效率高,与第二种方法相比更具可比性 (如下所示);

      请注意,此选择会影响 iterator/reference 稳定性并将 非常喜欢只读操作。如果你需要迭代器 稳定性 and/or 许多变异操作,使用常规 map (或对于 非常高的数量 hash_map) 而不是 平坦 变化。

    • 我为这个演示选择了一个 managed_mapped_file 片段(所以你会得到持久性)。该演示显示了如何稀疏地预分配 10G,但只有实际分配的 space 在磁盘上 使用 。您同样可以使用 managed_shared_memory.

      如果您有二进制持久性,您可能完全丢弃文本数据文件。

    • 我使用 Boost Spirit 将文本数据从 mapped_file_source 解析为 shared::multi_map<double, unsigned>。该实现是完全通用的。

    • 不用写iterator 类, start_of_line(), end_of_line(), lower_bound(), upper_bound() , equal_range() 或其中任何一个,因为它们已经是 multi_map 接口中的标准,所以我们只需要编写 main:

    Live On Coliru

    #define NDEBUG
    #undef DEBUG
    #include <boost/iostreams/device/mapped_file.hpp>
    #include <boost/fusion/adapted/std_pair.hpp>
    #include <boost/container/flat_map.hpp>
    #include <boost/interprocess/managed_mapped_file.hpp>
    #include <boost/spirit/include/qi.hpp>
    #include <iomanip>
    
    namespace bip = boost::interprocess;
    namespace qi = boost::spirit::qi;
    
    namespace shared {
        namespace bc = boost::container;
    
        template <typename T> using allocator = bip::allocator<T, bip::managed_mapped_file::segment_manager>;
        template <typename K, typename V>
            using multi_map = bc::flat_multimap<
                K, V, std::less<K>, 
                allocator<typename bc::flat_multimap<K, V>::value_type> >;
    }
    
    #include <iostream>
    
    bip::managed_mapped_file msm(bip::open_or_create, "lookup.bin", 10ul<<30);
    
    template <typename K, typename V>
    shared::multi_map<K,V>& get_or_load(const char* fname) {
        using Map = shared::multi_map<K, V>;
        Map* lookup = msm.find_or_construct<Map>("lookup")(msm.get_segment_manager());
    
        if (lookup->empty()) { 
            // only read input file if not already loaded
            boost::iostreams::mapped_file_source input(fname);
            auto f(input.data()), l(f + input.size());
    
            bool ok = qi::phrase_parse(f, l,
                    (qi::auto_ >> qi::auto_) % qi::eol >> *qi::eol, 
                    qi::blank, *lookup);
    
            if (!ok || (f!=l))
                throw std::runtime_error("Error during parsing at position #" + std::to_string(f - input.data()));
        }
    
        return *lookup;
    }
    
    int main() {
        // parse text file into shared memory binary representation
        auto const& lookup = get_or_load<double, unsigned int>("input.txt");
        auto const e = lookup.end();
    
        for(auto&& line : lookup)
        {
            std::cout << line.first << "\t" << line.second << "\n";
    
            auto er = lookup.equal_range(line.first);
    
            if (er.first  != e) std::cout << " lower: " << er.first->first  << "\t" << er.first->second  << "\n";
            if (er.second != e) std::cout << " upper: " << er.second->first << "\t" << er.second->second << "\n";
        }
    }
    
  2. 我完全按照我描述的方式实现了它:

    • 映射原始 const char* 区域的简单容器;

    • 使用 boost::iterator_facade 制作一个迭代器,在取消引用时解析文本;

    • 用于打印我使用的输入行 boost::string_ref - 这避免了复制字符串的动态分配。

    • 用灵气解析:

        if (!qi::phrase_parse(
                    b, _data.end,
                    qi::auto_ >> qi::auto_ >> qi::eoi,
                    qi::space,
                    _data.key, _data.value)) 
      

      选择 Qi 是为了速度和通用性:您可以在实例化时选择 KeyValue 类型:

        text_multi_lookup<double, unsigned int> tml(map.data(), map.data() + map.size());
      
    • 我已经实现了 lower_boundupper_boundequal_range 成员函数,它们利用了底层的连续存储。即使“线”iterator 不是随机访问而是双向的,我们仍然可以跳转到这样一个迭代器范围的 mid_point 因为我们可以从任何 [=38] 获得 start_of_line =] 进入底层映射区域。这使二进制搜索变得高效。

    请注意,此解决方案在取消引用 iterator 时解析行。如果多次取消引用相同的行,这可能效率不高。

    但是,对于不频繁的查找,或在输入数据的同一区域中不典型的查找,这将尽可能高效(仅进行最少需要的解析和 O(log n) 二进制搜索), 同时完全通过映射文件来绕过初始加载时间(没有访问意味着不需要加载任何东西)。

    Live On Coliru(含测试数据)

    #define NDEBUG
    #undef DEBUG
    #include <boost/iostreams/device/mapped_file.hpp>
    #include <boost/utility/string_ref.hpp>
    #include <boost/optional.hpp>
    #include <boost/spirit/include/qi.hpp>
    #include <thread>
    #include <iomanip>
    
    namespace io = boost::iostreams;
    namespace qi = boost::spirit::qi;
    
    template <typename Key, typename Value> 
    struct text_multi_lookup {
        text_multi_lookup(char const* begin, char const* end)
            : _map_begin(begin), 
              _map_end(end)
        {
        }
    
      private:
        friend struct iterator;
        enum : char { nl = '\n' };
    
        using rawit = char const*;
        rawit _map_begin, _map_end;
    
        rawit start_of_line(rawit it) const {
            while (it > _map_begin) if (*--it == nl) return it+1;
            assert(it == _map_begin);
            return it;
        }
    
        rawit end_of_line(rawit it) const {
            while (it < _map_end) if (*it++ == nl) return it;
            assert(it == _map_end);
            return it;
        }
    
      public:
        struct value_type final {
            rawit beg, end;
            Key   key;
            Value value;
    
            boost::string_ref str() const { return { beg, size_t(end-beg) }; }
        };
    
        struct iterator : boost::iterator_facade<iterator, boost::string_ref, boost::bidirectional_traversal_tag, value_type> {
    
            iterator(text_multi_lookup const& d, rawit it) : _region(&d), _data { it, nullptr, Key{}, Value{} } { 
                assert(_data.beg == _region->start_of_line(_data.beg));
            }
    
          private:
            friend text_multi_lookup;
    
            text_multi_lookup const* _region;
            value_type mutable _data;
    
            void ensure_parsed() const {
                if (!_data.end) 
                {
                    assert(_data.beg == _region->start_of_line(_data.beg));
                    auto b = _data.beg;
                    _data.end = _region->end_of_line(_data.beg);
    
                    if (!qi::phrase_parse(
                                b, _data.end,
                                qi::auto_ >> qi::auto_ >> qi::eoi,
                                qi::space,
                                _data.key, _data.value)) 
                    {
                        std::cerr << "Problem in: " << std::string(_data.beg, _data.end) 
                                  << "at:         " << std::setw(_data.end-_data.beg) << std::right << std::string(_data.beg,_data.end);
                        assert(false);
                    }
                }
            }
    
            static iterator mid_point(iterator const& a, iterator const& b) {
                assert(a._region == b._region);
                return { *a._region, a._region->start_of_line(a._data.beg + (b._data.beg -a._data.beg)/2) };
            }
    
          public:
            value_type const& dereference() const {
                ensure_parsed();
                return _data;
            }
    
            bool equal(iterator const& o) const {
                return (_region == o._region) && (_data.beg == o._data.beg);
            }
    
            void increment() {
                _data = { _region->end_of_line(_data.beg), nullptr, Key{}, Value{} };
                assert(_data.beg == _region->start_of_line(_data.beg));
            }
        };
    
        using const_iterator = iterator;
    
        const_iterator begin()  const { return { *this, _map_begin }; }
        const_iterator end()    const { return { *this, _map_end   }; }
        const_iterator cbegin() const { return { *this, _map_begin }; }
        const_iterator cend()   const { return { *this, _map_end   }; }
    
        template <typename CompatibleKey>
        const_iterator lower_bound(CompatibleKey const& key) const {
            auto f(begin()), l(end());
            while (f!=l) {
                auto m = iterator::mid_point(f,l);
    
                if (m->key < key) {
                    f = m;
                    ++f;
                }
                else {
                    l = m;
                }
            }
            return f;
        }
    
        template <typename CompatibleKey>
        const_iterator upper_bound(CompatibleKey const& key) const {
            return upper_bound(key, begin());
        }
    
      private:
        template <typename CompatibleKey>
        const_iterator upper_bound(CompatibleKey const& key, const_iterator f) const {
            auto l(end());
            while (f!=l) {
                auto m = iterator::mid_point(f,l);
    
                if (key < m->key) {
                    l = m;
                }
                else {
                    f = m;
                    ++f;
                }
            }
            return f;
        }
    
      public:
        template <typename CompatibleKey>
        std::pair<const_iterator, const_iterator> equal_range(CompatibleKey const& key) const {
            auto lb = lower_bound(key);
            return { lb, upper_bound(key, lb) };
        }
    
    };
    
    #include <iostream>
    
    int main() {
        io::mapped_file_source map("input.txt");
        text_multi_lookup<double, unsigned int> tml(map.data(), map.data() + map.size());
    
        auto const e = tml.end();
    
        for(auto&& line : tml)
        {
            std::cout << line.str();
    
            auto er = tml.equal_range(line.key);
    
            if (er.first  != e) std::cout << " lower: " << er.first->str();
            if (er.second != e) std::cout << " upper: " << er.second->str();
        }
    }
    

出于好奇:这是反汇编。请注意所有算法内容是如何直接内联到 main 中的:http://paste.ubuntu.com/9946135/