如何从 Spark MLlib FP Growth 模型中提取数据
How to extract data from Spark MLlib FP Growth model
我运行在独立模式下使用 spark master 和 slaves,没有 Hadoop 集群。
使用 spark-shell,我可以用我的数据快速构建一个 FPGrowthModel。
模型建立后,我试图查看模型中捕获的模式和频率,但 spark 挂在 collect() 方法(通过查看 Spark UI),数据集更大(200000 * 2000 矩阵,如数据)。
这是我在 spark-shell:
中 运行 的代码
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.rdd.RDD
val textFile = sc.textFile("/path/to/txt/file")
val data = textFile.map(_.split(" ")).cache()
val fpg = new FPGrowth().setMinSupport(0.9).setNumPartitions(8)
val model = fpg.run(data)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
我尝试将 spark shell 内存从 512MB 增加到 2GB,但似乎并没有缓解挂起问题。我不确定是不是因为需要 Hadoop 才能执行此任务,或者我需要增加 spark-shell 内存,或者其他原因。
15/08/10 22:19:40 ERROR TaskSchedulerImpl: Lost executor 0 on 142.103.22.23: remote Rpc client disassociated
15/08/10 22:19:40 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@142.103.22.23:43440] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/0 is now EXITED (Command exited with code 137)
15/08/10 22:19:40 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 4.0
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Executor app-20150810163957-0001/0 removed: Command exited with code 137
15/08/10 22:19:40 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 59, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 62, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 56, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 58, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 61, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 60, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 7.0 in stage 4.0 (TID 63, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 57, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor added: app-20150810163957-0001/1 on worker-20150810163259-142.103.22.23-48853 (142.103.22.23:48853) with 8 cores
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150810163957-0001/1 on hostPort 142.103.22.23:48853 with 8 cores, 15.0 GB RAM
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now LOADING
15/08/10 22:19:40 INFO DAGScheduler: Executor lost: 0 (epoch 2)
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now RUNNING
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 142.103.22.23, 37411)
15/08/10 22:19:40 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
15/08/10 22:19:40 INFO ShuffleMapStage: ShuffleMapStage 3 is now unavailable on executor 0 (0/16, false)
你不应该 运行 .collect() 如果数据集很大,比如几 GB,你不应该使用它,它有助于加快进行多次评估的速度。
运行 没有收集的foreach 循环。
Kryo 是一个比 org.apache.spark.serializer.JavaSerializer 更快的序列化器。一个可能的解决方法是告诉 spark 不要使用 Kryo:
val conf = (new org.apache.spark.SparkConf()
.setAppName("APP_NAME")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
并再次尝试 运行 上面的代码。
参考这个link:
尝试将 collect()
替换为 local iterator. Ultimately, you may be running into a limitation of the FPGrowth implementation. See my posting and Spark JIRA issue。
我运行在独立模式下使用 spark master 和 slaves,没有 Hadoop 集群。 使用 spark-shell,我可以用我的数据快速构建一个 FPGrowthModel。 模型建立后,我试图查看模型中捕获的模式和频率,但 spark 挂在 collect() 方法(通过查看 Spark UI),数据集更大(200000 * 2000 矩阵,如数据)。 这是我在 spark-shell:
中 运行 的代码import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.rdd.RDD
val textFile = sc.textFile("/path/to/txt/file")
val data = textFile.map(_.split(" ")).cache()
val fpg = new FPGrowth().setMinSupport(0.9).setNumPartitions(8)
val model = fpg.run(data)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
我尝试将 spark shell 内存从 512MB 增加到 2GB,但似乎并没有缓解挂起问题。我不确定是不是因为需要 Hadoop 才能执行此任务,或者我需要增加 spark-shell 内存,或者其他原因。
15/08/10 22:19:40 ERROR TaskSchedulerImpl: Lost executor 0 on 142.103.22.23: remote Rpc client disassociated
15/08/10 22:19:40 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@142.103.22.23:43440] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/0 is now EXITED (Command exited with code 137)
15/08/10 22:19:40 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 4.0
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Executor app-20150810163957-0001/0 removed: Command exited with code 137
15/08/10 22:19:40 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 59, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 62, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 56, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 58, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 61, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 60, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 7.0 in stage 4.0 (TID 63, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 57, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor added: app-20150810163957-0001/1 on worker-20150810163259-142.103.22.23-48853 (142.103.22.23:48853) with 8 cores
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150810163957-0001/1 on hostPort 142.103.22.23:48853 with 8 cores, 15.0 GB RAM
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now LOADING
15/08/10 22:19:40 INFO DAGScheduler: Executor lost: 0 (epoch 2)
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now RUNNING
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 142.103.22.23, 37411)
15/08/10 22:19:40 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
15/08/10 22:19:40 INFO ShuffleMapStage: ShuffleMapStage 3 is now unavailable on executor 0 (0/16, false)
你不应该 运行 .collect() 如果数据集很大,比如几 GB,你不应该使用它,它有助于加快进行多次评估的速度。 运行 没有收集的foreach 循环。
Kryo 是一个比 org.apache.spark.serializer.JavaSerializer 更快的序列化器。一个可能的解决方法是告诉 spark 不要使用 Kryo:
val conf = (new org.apache.spark.SparkConf()
.setAppName("APP_NAME")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
并再次尝试 运行 上面的代码。
参考这个link:
尝试将 collect()
替换为 local iterator. Ultimately, you may be running into a limitation of the FPGrowth implementation. See my posting