在 Worker 的帮助下从 Spark 读取大量 MongoDB 集合
Reading Huge MongoDB collection from Spark with help of Worker
我想从 Spark 读取一个巨大的 MongoDB 集合创建一个持久的 RDD 并对其进行进一步的数据分析。
有什么方法可以更快地从 MongoDB 读取数据。
尝试过 MongoDB Java + Casbah
的方法
我可以使用 worker/slave 从 MongoDB 并行读取数据,然后将其保存为持久数据并使用它吗?
有两种方法可以将数据从 MongoDB 获取到 Apache Spark。
方法一:
使用 Casbah(MongDB Java 驱动层)
val uriRemote = MongoClientURI("mongodb://RemoteURL:27017/")
val mongoClientRemote = MongoClient(uriRemote)
val dbRemote = mongoClientRemote("dbName")
val collectionRemote = dbRemote("collectionName")
val ipMongo = collectionRemote.find
val ipRDD = sc.makeRDD(ipMongo.toList)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")
这里我们使用Scala和Casbah先获取数据,然后保存到HDFS。
方法二:我们使用的Spark Worker
更好的代码版本:使用Spark worker和多核来在短时间内获取数据。
val config = new Configuration()
config.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat")
config.set("mongo.input.uri", "mongodb://RemoteURL:27017/dbName.collectionName")
val keyClassName = classOf[Object]
val valueClassName = classOf[BSONObject]
val inputFormatClassName = classOf[com.mongodb.hadoop.MongoInputFormat]
val ipRDD = sc.newAPIHadoopRDD(config,inputFormatClassName,keyClassName,valueClassName)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")
我想从 Spark 读取一个巨大的 MongoDB 集合创建一个持久的 RDD 并对其进行进一步的数据分析。
有什么方法可以更快地从 MongoDB 读取数据。 尝试过 MongoDB Java + Casbah
的方法我可以使用 worker/slave 从 MongoDB 并行读取数据,然后将其保存为持久数据并使用它吗?
有两种方法可以将数据从 MongoDB 获取到 Apache Spark。
方法一: 使用 Casbah(MongDB Java 驱动层)
val uriRemote = MongoClientURI("mongodb://RemoteURL:27017/")
val mongoClientRemote = MongoClient(uriRemote)
val dbRemote = mongoClientRemote("dbName")
val collectionRemote = dbRemote("collectionName")
val ipMongo = collectionRemote.find
val ipRDD = sc.makeRDD(ipMongo.toList)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")
这里我们使用Scala和Casbah先获取数据,然后保存到HDFS。
方法二:我们使用的Spark Worker
更好的代码版本:使用Spark worker和多核来在短时间内获取数据。
val config = new Configuration()
config.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat")
config.set("mongo.input.uri", "mongodb://RemoteURL:27017/dbName.collectionName")
val keyClassName = classOf[Object]
val valueClassName = classOf[BSONObject]
val inputFormatClassName = classOf[com.mongodb.hadoop.MongoInputFormat]
val ipRDD = sc.newAPIHadoopRDD(config,inputFormatClassName,keyClassName,valueClassName)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")