读取大型 mongodb 数据
Read large mongodb data
我有一个 java 应用程序需要从 MongoDB 3.2 读取大量数据并将其传输到 Hadoop。
此批处理应用程序 运行 每 4 小时一次,每天 6 次。
数据规格:
- 文档:一次 80000 个(每 4 小时)
- 大小:3gb
目前我正在使用 MongoTemplate 和 Morphia 来访问 MongoDB。
但是,在使用以下命令处理此数据时出现 OOM 异常:
List<MYClass> datalist = datasource.getCollection("mycollection").find().asList();
读取此数据并填充到 Hadoop 的最佳方法是什么?
MongoTemplate::Stream()
并一一写入Hadoop?
batchSize(someLimit)
并将整个批次写入 Hadoop?
Cursor.batch()
并一一写入hdfs?
你的问题出在asList()
调用
这会强制驱动程序遍历整个游标(80,000 个文档,几个 Gig),将所有内容保存在内存中。
batchSize(someLimit)
和 Cursor.batch()
在您遍历整个游标时无济于事,无论批处理大小是多少。
相反,您可以:
1) 迭代游标:List<MYClass> datalist = datasource.getCollection("mycollection").find()
2) 一次读取一个文档并将文档送入缓冲区(假设是一个列表)
3) 每 1000 个文档(比方说)调用 Hadoop API,清除缓冲区,然后重新开始。
asList()
调用将尝试将整个 Mongodb 集合加载到内存中。试图使内存列表对象大于 3gb 的大小。
用游标迭代集合将解决这个问题。您可以使用数据源 class 执行此操作,但我更喜欢 Morphia 提供的类型安全抽象 DAO classes:
class Dao extends BasicDAO<Order, String> {
Dao(Datastore ds) {
super(Order.class, ds);
}
}
Datastore ds = morphia.createDatastore(mongoClient, DB_NAME);
Dao dao = new Dao(ds);
Iterator<> iterator = dao.find().fetch();
while (iterator.hasNext()) {
Order order = iterator.next;
hadoopStrategy.add(order);
}
我有一个 java 应用程序需要从 MongoDB 3.2 读取大量数据并将其传输到 Hadoop。
此批处理应用程序 运行 每 4 小时一次,每天 6 次。
数据规格:
- 文档:一次 80000 个(每 4 小时)
- 大小:3gb
目前我正在使用 MongoTemplate 和 Morphia 来访问 MongoDB。 但是,在使用以下命令处理此数据时出现 OOM 异常:
List<MYClass> datalist = datasource.getCollection("mycollection").find().asList();
读取此数据并填充到 Hadoop 的最佳方法是什么?
MongoTemplate::Stream()
并一一写入Hadoop?batchSize(someLimit)
并将整个批次写入 Hadoop?Cursor.batch()
并一一写入hdfs?
你的问题出在asList()
调用
这会强制驱动程序遍历整个游标(80,000 个文档,几个 Gig),将所有内容保存在内存中。
batchSize(someLimit)
和 Cursor.batch()
在您遍历整个游标时无济于事,无论批处理大小是多少。
相反,您可以:
1) 迭代游标:List<MYClass> datalist = datasource.getCollection("mycollection").find()
2) 一次读取一个文档并将文档送入缓冲区(假设是一个列表)
3) 每 1000 个文档(比方说)调用 Hadoop API,清除缓冲区,然后重新开始。
asList()
调用将尝试将整个 Mongodb 集合加载到内存中。试图使内存列表对象大于 3gb 的大小。
用游标迭代集合将解决这个问题。您可以使用数据源 class 执行此操作,但我更喜欢 Morphia 提供的类型安全抽象 DAO classes:
class Dao extends BasicDAO<Order, String> {
Dao(Datastore ds) {
super(Order.class, ds);
}
}
Datastore ds = morphia.createDatastore(mongoClient, DB_NAME);
Dao dao = new Dao(ds);
Iterator<> iterator = dao.find().fetch();
while (iterator.hasNext()) {
Order order = iterator.next;
hadoopStrategy.add(order);
}