Mongo 使用 Pyspark(分片集群)写入时间过长
Mongo write taking too long with Pyspark (sharded cluster)
我正在尝试读取 parquet 文件并将其转储到 mongodb 集合(分片)。
当我在没有分片的情况下这样做时,写入吞吐量非常好。但是在分片之后它已经急剧下降了。
单个任务需要 30 分钟以上,仅处理 16 mb 数据
我正在使用以下 Spark 配置
(
SparkConf()
.setMaster("yarn")
.set("spark.executor.memory", "30g")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", "5")
.set("spark.sql.shuffle.partitions", "2000")
.set("spark.network.timeout", "800")
.set("spark.sql.broadcastTimeout", "1200")
.set("spark.default.parallelism", "2000")
.set('spark.jars', './mongo*.jar')
.set("spark.mongodb.input.uri", mongo_uri)
.set("spark.mongodb.input.database", db)
.set("spark.mongodb.input.collection", db_collection)
.set("spark.mongodb.output.uri", mongo_uri)
.set("spark.mongodb.output.database", db)
.set("spark.mongodb.output.collection", db_collection)
.set("spark.mongodb.input.partitionerOptions.partitionKey", shard_key)
.set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.set("spark.mongodb.input.partitionerOptions.shardkey", shard_key)
)
我希望转储超过 200 亿条记录,8 小时后它只插入了大约 8 亿个文档。
文档大小相同,每个文档为 250 KB
没有使用额外的索引。
以防万一其他人遇到这个问题。出于某种原因,在写入 mongo.
时未使用 sparksession 级别 mongo 配置
要克服这个问题,请在您的写作步骤中明确给出 mongo 配置
mongo_conf = {
'uri': mongo_uri,
'database': db,
'maxBatchSize': 100000,
'forceInsert': True,
'ordered': False,
'shardKey': {'your_key': 'hashed'},
'collection': "test2",
}
(
df
.write
.options(**mongo_conf)
.format(mongo_format)
.mode("append").save()
)
我正在尝试读取 parquet 文件并将其转储到 mongodb 集合(分片)。 当我在没有分片的情况下这样做时,写入吞吐量非常好。但是在分片之后它已经急剧下降了。
单个任务需要 30 分钟以上,仅处理 16 mb 数据
我正在使用以下 Spark 配置
(
SparkConf()
.setMaster("yarn")
.set("spark.executor.memory", "30g")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", "5")
.set("spark.sql.shuffle.partitions", "2000")
.set("spark.network.timeout", "800")
.set("spark.sql.broadcastTimeout", "1200")
.set("spark.default.parallelism", "2000")
.set('spark.jars', './mongo*.jar')
.set("spark.mongodb.input.uri", mongo_uri)
.set("spark.mongodb.input.database", db)
.set("spark.mongodb.input.collection", db_collection)
.set("spark.mongodb.output.uri", mongo_uri)
.set("spark.mongodb.output.database", db)
.set("spark.mongodb.output.collection", db_collection)
.set("spark.mongodb.input.partitionerOptions.partitionKey", shard_key)
.set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.set("spark.mongodb.input.partitionerOptions.shardkey", shard_key)
)
我希望转储超过 200 亿条记录,8 小时后它只插入了大约 8 亿个文档。
文档大小相同,每个文档为 250 KB
没有使用额外的索引。
以防万一其他人遇到这个问题。出于某种原因,在写入 mongo.
时未使用 sparksession 级别 mongo 配置要克服这个问题,请在您的写作步骤中明确给出 mongo 配置
mongo_conf = {
'uri': mongo_uri,
'database': db,
'maxBatchSize': 100000,
'forceInsert': True,
'ordered': False,
'shardKey': {'your_key': 'hashed'},
'collection': "test2",
}
(
df
.write
.options(**mongo_conf)
.format(mongo_format)
.mode("append").save()
)