Mongo 使用 Pyspark(分片集群)写入时间过长

Mongo write taking too long with Pyspark (sharded cluster)

我正在尝试读取 parquet 文件并将其转储到 mongodb 集合(分片)。 当我在没有分片的情况下这样做时,写入吞吐量非常好。但是在分片之后它已经急剧下降了。

单个任务需要 30 分钟以上,仅处理 16 mb 数据

我正在使用以下 Spark 配置

(
     SparkConf()
    .setMaster("yarn")
    .set("spark.executor.memory", "30g")
    .set("spark.executor.instances", "10")
    .set("spark.executor.cores", "5")
    .set("spark.sql.shuffle.partitions", "2000")
    .set("spark.network.timeout", "800")
    .set("spark.sql.broadcastTimeout", "1200")
    .set("spark.default.parallelism", "2000") 
    .set('spark.jars', './mongo*.jar')
    .set("spark.mongodb.input.uri", mongo_uri)
    .set("spark.mongodb.input.database", db)
    .set("spark.mongodb.input.collection", db_collection)
    .set("spark.mongodb.output.uri", mongo_uri)
    .set("spark.mongodb.output.database", db)
    .set("spark.mongodb.output.collection", db_collection)
    .set("spark.mongodb.input.partitionerOptions.partitionKey", shard_key)
    .set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
    .set("spark.mongodb.input.partitionerOptions.shardkey", shard_key) 
)

我希望转储超过 200 亿条记录,8 小时后它只插入了大约 8 亿个文档。

文档大小相同,每个文档为 250 KB

没有使用额外的索引。

以防万一其他人遇到这个问题。出于某种原因,在写入 mongo.

时未使用 sparksession 级别 mongo 配置

要克服这个问题,请在您的写作步骤中明确给出 mongo 配置

mongo_conf = {
    'uri': mongo_uri,
    'database': db,
    'maxBatchSize': 100000,
    'forceInsert': True,
    'ordered': False,
    'shardKey': {'your_key': 'hashed'},
    'collection': "test2",
}

(
        df
        .write
        .options(**mongo_conf)
        .format(mongo_format)
        .mode("append").save()
)