应用 pyspark ALS 时出现 StackOverflow 错误 "recommendProductsForUsers"(尽管 >300GB Ram 的集群可用)
StackOverflow-error when applying pyspark ALS's "recommendProductsForUsers" (although cluster of >300GB Ram available)
正在寻找专业知识来指导我解决以下问题。
背景:
- 我正在尝试使用受 this 启发的基本 PySpark 脚本
示例
- 作为部署基础设施,我使用 Google Cloud Dataproc 集群。
- 我代码中的基石是 "recommendProductsForUsers" 记录的 here 函数,它返回模型中所有用户的前 X 产品
我遇到的问题
ALS.Train 脚本 运行 在 GCP 上运行流畅且扩展性好(轻松超过 100 万客户)。
但是,应用预测:即使用函数 'PredictAll' 或 'recommendProductsForUsers',根本无法缩放。我的脚本 运行 对于小型数据集(<100 个客户
<100 种产品)。但是,当将其调整为与业务相关的规模时,我无法扩展它(例如,>50k 的客户和>10k 的产品)
我得到的错误如下:
16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager:
Lost task 22.0 in stage 411.0 (TID 15139,
productrecommendation-high-w-2.c.main-nova-558.internal):
java.lang.WhosebugError
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
我什至弄了一个 300GB 的集群(1 个 108GB 的主节点 + 2 个 108GB RAM 的节点)来尝试 运行 它;它适用于 50,000 个客户,但仅此而已
我的目标是建立一个可以 运行 超过 80 万客户的设置
详情
失败的代码行
predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
pprint.pprint(predictions.take(10))
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()
您建议如何进行?我觉得我的脚本末尾的 'merging' 部分(即当我将它写入 dfToSave 时)导致了错误;有没有办法绕过这个并逐部分保存?
从堆栈跟踪来看,这似乎是与
相同的问题
基本上,Spark 递归地表示 RDD 沿袭,这样当在迭代工作负载过程中没有对事物进行惰性评估时,您最终会得到深度嵌套的对象。调用 sc.setCheckpointDir 并调整检查点间隔将减少此 RDD 谱系的长度。
正在寻找专业知识来指导我解决以下问题。
背景:
- 我正在尝试使用受 this 启发的基本 PySpark 脚本 示例
- 作为部署基础设施,我使用 Google Cloud Dataproc 集群。
- 我代码中的基石是 "recommendProductsForUsers" 记录的 here 函数,它返回模型中所有用户的前 X 产品
我遇到的问题
ALS.Train 脚本 运行 在 GCP 上运行流畅且扩展性好(轻松超过 100 万客户)。
但是,应用预测:即使用函数 'PredictAll' 或 'recommendProductsForUsers',根本无法缩放。我的脚本 运行 对于小型数据集(<100 个客户 <100 种产品)。但是,当将其调整为与业务相关的规模时,我无法扩展它(例如,>50k 的客户和>10k 的产品)
我得到的错误如下:
16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 22.0 in stage 411.0 (TID 15139, productrecommendation-high-w-2.c.main-nova-558.internal): java.lang.WhosebugError at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
我什至弄了一个 300GB 的集群(1 个 108GB 的主节点 + 2 个 108GB RAM 的节点)来尝试 运行 它;它适用于 50,000 个客户,但仅此而已
我的目标是建立一个可以 运行 超过 80 万客户的设置
详情
失败的代码行
predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
pprint.pprint(predictions.take(10))
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()
您建议如何进行?我觉得我的脚本末尾的 'merging' 部分(即当我将它写入 dfToSave 时)导致了错误;有没有办法绕过这个并逐部分保存?
从堆栈跟踪来看,这似乎是与
基本上,Spark 递归地表示 RDD 沿袭,这样当在迭代工作负载过程中没有对事物进行惰性评估时,您最终会得到深度嵌套的对象。调用 sc.setCheckpointDir 并调整检查点间隔将减少此 RDD 谱系的长度。