将大型 sparknlp 管道加载到 Apache Spark 批处理作业中花费的时间太长
Loading large sparknlp pipeline into Apache Spark batch job taking too long
我正在使用 johnsnowlabs 的 SparkNLP 从我的文本数据中提取嵌入,下面是管道。保存到hdfs后模型大小为1.8g
embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
.setInputCols("sentence") \
.setOutputCol("sentence_embeddings")
nlp_pipeline = Pipeline(stages=[document_assembler, sentence_detector, embeddings])
pipeline_model = nlp_pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))
我使用 pipeline_model.save("hdfs:///<path>")
将 pipeline_model
保存到 HDFS
。
上面只执行了一次
在另一个脚本中,我使用 pipeline_model = PretrainedPipeline.from_disk("hdfs:///<path>")
从 HDFS
加载存储的管道。
上面的代码加载了模型但占用了太多资源。我在 spark 本地模型(无集群)上测试了它,但我有高资源 94g RAM,32 核。
后来,我用 12 个执行器在 yarn 上部署了脚本,每个执行器有 3 个内核和 7g ram。我给驱动分配了10g的内存。
仅从 HDFS 加载保存的模型,脚本再次花费太多时间。
当火花到达这一点时(见上面的截图),它花费了太多时间
我想到了一个办法
正在预加载
我认为的方法是以某种方式将模型一次预加载到内存中,当脚本想要在数据帧上应用转换时,我可以以某种方式调用对预训练管道的引用并随时使用它,没有做任何磁盘 i/o。我找了找也找不到。
请让我知道您对这个解决方案的看法以及实现它的最佳方法。
YARN 上的资源
节点名称
计数
RAM(每个)
核心(每个)
主节点
1
38g
8
辅助节点
1
38 克
8
工作节点
4
24 克
4
总计
6
172g
32
谢谢
如评论中所述,这是基于 PyTorch 而非 SparkNLP 的解决方案。简化代码:
# labse_spark.py
LABSE_MODEL, LABSE_TOKENIZER = None
def transform(spark, df, input_col='text', output_col='output'):
spark.sparkContext.addFile('hdfs:///path/to/labse_model')
output_schema = T.StructType(df.schema.fields + [T.StructField(output_col, T.ArrayType(T.FloatType()))])
rdd = df.rdd.mapPartitions(_map_partitions_func(input_col, output_col))
res = spark.createDataFrame(data=rdd, schema=output_schema)
return res
def _map_partitions_func(input_col, output_col):
def executor_func(rows):
# load everything to memory (partitions should be small, ~1k rows per partition):
pandas_df = pd.DataFrame([r.asDict() for r in rows])
global LABSE_MODEL, LABSE_TOKENIZER
if not (LABSE_TOKENIZER or LABSE_MODEL): # should happen once per executor core
LABSE_TOKENIZER = AutoTokenizer.from_pretrained(SparkFiles.get('labse_model'))
LABSE_MODEL = AutoModel.from_pretrained(SparkFiles.get('labse_model'))
# copied from HF model card:
encoded_input = LABSE_TOKENIZER(
pandas_df[input_col].tolist(), padding=True, truncation=True, max_length=64, return_tensors='pt')
with torch.no_grad():
model_output = LABSE_MODEL(**encoded_input)
embeddings = model_output.pooler_output
embeddings = torch.nn.functional.normalize(embeddings)
pandas_df[output_col] = pd.Series(embeddings.tolist())
return pandas_df.to_dict('records')
return executor_func
我正在使用 johnsnowlabs 的 SparkNLP 从我的文本数据中提取嵌入,下面是管道。保存到hdfs后模型大小为1.8g
embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
.setInputCols("sentence") \
.setOutputCol("sentence_embeddings")
nlp_pipeline = Pipeline(stages=[document_assembler, sentence_detector, embeddings])
pipeline_model = nlp_pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))
我使用 pipeline_model.save("hdfs:///<path>")
将 pipeline_model
保存到 HDFS
。
上面只执行了一次
在另一个脚本中,我使用 pipeline_model = PretrainedPipeline.from_disk("hdfs:///<path>")
从 HDFS
加载存储的管道。
上面的代码加载了模型但占用了太多资源。我在 spark 本地模型(无集群)上测试了它,但我有高资源 94g RAM,32 核。
后来,我用 12 个执行器在 yarn 上部署了脚本,每个执行器有 3 个内核和 7g ram。我给驱动分配了10g的内存。
仅从 HDFS 加载保存的模型,脚本再次花费太多时间。
当火花到达这一点时(见上面的截图),它花费了太多时间
我想到了一个办法
正在预加载
我认为的方法是以某种方式将模型一次预加载到内存中,当脚本想要在数据帧上应用转换时,我可以以某种方式调用对预训练管道的引用并随时使用它,没有做任何磁盘 i/o。我找了找也找不到。
请让我知道您对这个解决方案的看法以及实现它的最佳方法。
YARN 上的资源
节点名称 | 计数 | RAM(每个) | 核心(每个) |
---|---|---|---|
主节点 | 1 | 38g | 8 |
辅助节点 | 1 | 38 克 | 8 |
工作节点 | 4 | 24 克 | 4 |
总计 | 6 | 172g | 32 |
谢谢
如评论中所述,这是基于 PyTorch 而非 SparkNLP 的解决方案。简化代码:
# labse_spark.py
LABSE_MODEL, LABSE_TOKENIZER = None
def transform(spark, df, input_col='text', output_col='output'):
spark.sparkContext.addFile('hdfs:///path/to/labse_model')
output_schema = T.StructType(df.schema.fields + [T.StructField(output_col, T.ArrayType(T.FloatType()))])
rdd = df.rdd.mapPartitions(_map_partitions_func(input_col, output_col))
res = spark.createDataFrame(data=rdd, schema=output_schema)
return res
def _map_partitions_func(input_col, output_col):
def executor_func(rows):
# load everything to memory (partitions should be small, ~1k rows per partition):
pandas_df = pd.DataFrame([r.asDict() for r in rows])
global LABSE_MODEL, LABSE_TOKENIZER
if not (LABSE_TOKENIZER or LABSE_MODEL): # should happen once per executor core
LABSE_TOKENIZER = AutoTokenizer.from_pretrained(SparkFiles.get('labse_model'))
LABSE_MODEL = AutoModel.from_pretrained(SparkFiles.get('labse_model'))
# copied from HF model card:
encoded_input = LABSE_TOKENIZER(
pandas_df[input_col].tolist(), padding=True, truncation=True, max_length=64, return_tensors='pt')
with torch.no_grad():
model_output = LABSE_MODEL(**encoded_input)
embeddings = model_output.pooler_output
embeddings = torch.nn.functional.normalize(embeddings)
pandas_df[output_col] = pd.Series(embeddings.tolist())
return pandas_df.to_dict('records')
return executor_func