如何高效地从mongodb中读取数据并将其转换为spark的dataframe?

How to efficiently read data from mongodb and convert it into spark's dataframe?

我已经研究了很多但找不到解决方案。我在这里能找到的最接近的问题是 Why my SPARK works very slowly with mongoDB.

我正在尝试使用 mongo-hadoop 连接器将 mongodb 集合加载到 spark 的 DataFrame 中。这是一段相关代码:

connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name)
trainrdd = sc.mongoRDD(connection_string, config=config)
#     traindf = sqlcontext.createDataFrame(trainrdd)
#     traindf = sqlcontext.read.json(trainrdd)
traindf = sqlcontext.jsonRDD(trainrdd) 

这里,'sc'是SparkContext对象。我还尝试了代码中注释掉的变体。但一切都同样缓慢。对于大小为 2GB(100000 行和 1000 列)的集合,在 3 台机器的集群上大约需要 6 个小时(天啊 :/),每台机器有 12 个内核和 72 GB RAM(使用这个 spark 集群中的所有内核)。 Mongodb 服务器也在其中一台机器上 运行。

我不确定我是否做对了。关于如何优化此代码的任何指示都会非常有帮助。

默认情况下 pyspark.sql.SQLContext.jsonRDD 将动态推断给定 JSON 数据集的架构。将在找到新的 JSON 字段时添加列。这可能会很慢,因为每个 JSON 属性都会被检查。特别是如果你有 1000 列。

如果数据已知,您可以改为显式定义架构,只需要一组特定的字段。

此外,由于 ObjectId HADOOP-277 中描述的问题,您需要删除包含此类不兼容类型的字段,或转换为其他类型。即 str(ObjectId(...))

例如:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark 
pymongo_spark.activate()
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection")
sqlcontext = SQLContext(sc)

# Define your schema explicitly
schema = StructType([StructField("firstname", StringType()),
                     StructField("lastname", StringType()),
                     StructField("description", StringType())])

# Create a mapper function to return only the fields wanted, or to convert. 
def project(doc):
    return {"firstname": str(doc["firstname"]), 
            "lastname": str(doc["lastname"]), 
            "description": str(doc["description"])}

projected_rdd = data_rdd.map(project)
train_df = sqlcontext.jsonRDD(projected_rdd, schema)
train_df.first()

以上代码片段在环境中进行了测试:Spark v1.6.1, mongo-hadoop spark v1.5.2

使用 pyspark 从 mongo 读取数据的有效方法是使用 MongoDb spark connector

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext()
spark = SparkSession(sc)
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").load()

这将是 spark 数据帧,无需转换 it.You 只需要配置 mongodb spark 连接器。

如果您使用的是笔记本,请将此写在顶部-

 %%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}

如果您使用的是 spark-submit 命令:

spark-submit --conf spark.pyspark.python=/usr/bin/anaconda/envs/py35/bin/python3.5 --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 file_name.py

如果你想把它写回 mangoDB,试试:

data.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.output.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").save()