如何高效地从mongodb中读取数据并将其转换为spark的dataframe?
How to efficiently read data from mongodb and convert it into spark's dataframe?
我已经研究了很多但找不到解决方案。我在这里能找到的最接近的问题是 Why my SPARK works very slowly with mongoDB.
我正在尝试使用 mongo-hadoop 连接器将 mongodb 集合加载到 spark 的 DataFrame 中。这是一段相关代码:
connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name)
trainrdd = sc.mongoRDD(connection_string, config=config)
# traindf = sqlcontext.createDataFrame(trainrdd)
# traindf = sqlcontext.read.json(trainrdd)
traindf = sqlcontext.jsonRDD(trainrdd)
这里,'sc'是SparkContext对象。我还尝试了代码中注释掉的变体。但一切都同样缓慢。对于大小为 2GB(100000 行和 1000 列)的集合,在 3 台机器的集群上大约需要 6 个小时(天啊 :/),每台机器有 12 个内核和 72 GB RAM(使用这个 spark 集群中的所有内核)。 Mongodb 服务器也在其中一台机器上 运行。
我不确定我是否做对了。关于如何优化此代码的任何指示都会非常有帮助。
默认情况下 pyspark.sql.SQLContext.jsonRDD 将动态推断给定 JSON 数据集的架构。将在找到新的 JSON 字段时添加列。这可能会很慢,因为每个 JSON 属性都会被检查。特别是如果你有 1000 列。
如果数据已知或,您可以改为显式定义架构,只需要一组特定的字段。
此外,由于 ObjectId
HADOOP-277 中描述的问题,您需要删除包含此类不兼容类型的字段,或转换为其他类型。即 str(ObjectId(...))
例如:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark
pymongo_spark.activate()
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection")
sqlcontext = SQLContext(sc)
# Define your schema explicitly
schema = StructType([StructField("firstname", StringType()),
StructField("lastname", StringType()),
StructField("description", StringType())])
# Create a mapper function to return only the fields wanted, or to convert.
def project(doc):
return {"firstname": str(doc["firstname"]),
"lastname": str(doc["lastname"]),
"description": str(doc["description"])}
projected_rdd = data_rdd.map(project)
train_df = sqlcontext.jsonRDD(projected_rdd, schema)
train_df.first()
以上代码片段在环境中进行了测试:Spark v1.6.1, mongo-hadoop spark v1.5.2
使用 pyspark 从 mongo 读取数据的有效方法是使用 MongoDb spark connector
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext()
spark = SparkSession(sc)
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").load()
这将是 spark 数据帧,无需转换 it.You 只需要配置 mongodb spark 连接器。
如果您使用的是笔记本,请将此写在顶部-
%%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}
如果您使用的是 spark-submit 命令:
spark-submit --conf spark.pyspark.python=/usr/bin/anaconda/envs/py35/bin/python3.5 --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 file_name.py
如果你想把它写回 mangoDB,试试:
data.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.output.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").save()
我已经研究了很多但找不到解决方案。我在这里能找到的最接近的问题是 Why my SPARK works very slowly with mongoDB.
我正在尝试使用 mongo-hadoop 连接器将 mongodb 集合加载到 spark 的 DataFrame 中。这是一段相关代码:
connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name)
trainrdd = sc.mongoRDD(connection_string, config=config)
# traindf = sqlcontext.createDataFrame(trainrdd)
# traindf = sqlcontext.read.json(trainrdd)
traindf = sqlcontext.jsonRDD(trainrdd)
这里,'sc'是SparkContext对象。我还尝试了代码中注释掉的变体。但一切都同样缓慢。对于大小为 2GB(100000 行和 1000 列)的集合,在 3 台机器的集群上大约需要 6 个小时(天啊 :/),每台机器有 12 个内核和 72 GB RAM(使用这个 spark 集群中的所有内核)。 Mongodb 服务器也在其中一台机器上 运行。
我不确定我是否做对了。关于如何优化此代码的任何指示都会非常有帮助。
默认情况下 pyspark.sql.SQLContext.jsonRDD 将动态推断给定 JSON 数据集的架构。将在找到新的 JSON 字段时添加列。这可能会很慢,因为每个 JSON 属性都会被检查。特别是如果你有 1000 列。
如果数据已知或,您可以改为显式定义架构,只需要一组特定的字段。
此外,由于 ObjectId
HADOOP-277 中描述的问题,您需要删除包含此类不兼容类型的字段,或转换为其他类型。即 str(ObjectId(...))
例如:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark
pymongo_spark.activate()
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection")
sqlcontext = SQLContext(sc)
# Define your schema explicitly
schema = StructType([StructField("firstname", StringType()),
StructField("lastname", StringType()),
StructField("description", StringType())])
# Create a mapper function to return only the fields wanted, or to convert.
def project(doc):
return {"firstname": str(doc["firstname"]),
"lastname": str(doc["lastname"]),
"description": str(doc["description"])}
projected_rdd = data_rdd.map(project)
train_df = sqlcontext.jsonRDD(projected_rdd, schema)
train_df.first()
以上代码片段在环境中进行了测试:Spark v1.6.1, mongo-hadoop spark v1.5.2
使用 pyspark 从 mongo 读取数据的有效方法是使用 MongoDb spark connector
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext()
spark = SparkSession(sc)
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").load()
这将是 spark 数据帧,无需转换 it.You 只需要配置 mongodb spark 连接器。
如果您使用的是笔记本,请将此写在顶部-
%%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}
如果您使用的是 spark-submit 命令:
spark-submit --conf spark.pyspark.python=/usr/bin/anaconda/envs/py35/bin/python3.5 --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 file_name.py
如果你想把它写回 mangoDB,试试:
data.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.output.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").save()