如何从文件中读取数据并将其传递给 Spark/PySpark 中的 FPGrowth 算法
How to read data from a file and pass it to the FPGrowth algorithm in Spark/PySpark
我正在尝试从文件中读取数据(项目以逗号分隔)并使用 PySpark 将此数据传递给 FPGrowth 算法。
到目前为止我的代码如下:
import pyspark
from pyspark import SparkContext
sc = SparkContext("local", "Assoc Rules", pyFiles=[])
txt = sc.textFile("step3.basket")
data = txt.map(lambda line: line.split(",")).collect()
rdd = sc.parallelize(data, 2)
from pyspark.ml.fpm import FPGrowth
fpg = FPGrowth(minSupport=0.02, minConfidence=0.6)
model = fpg.fit(rdd)
但是当我尝试 运行 代码时,出现以下错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-3-d34039dccad5> in <module>()
2
3 fpg = FPGrowth(minSupport=0.02, minConfidence=0.6)
----> 4 model = fpg.fit(rdd)
~/local/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
62 return self.copy(params)._fit(dataset)
63 else:
---> 64 return self._fit(dataset)
65 else:
66 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
~/local/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
263
264 def _fit(self, dataset):
--> 265 java_model = self._fit_java(dataset)
266 return self._create_model(java_model)
267
~/local/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
260 """
261 self._transfer_params_to_java()
--> 262 return self._java_obj.fit(dataset._jdf)
263
264 def _fit(self, dataset):
AttributeError: 'RDD' 对象没有属性 '_jdf'
我做错了什么,我该如何纠正?
来自 pyspark.ml.fpm 的 FPGrowth 采用 pyspark 数据帧,而不是 rdd。将rdd转换成dataframe然后通过。检查 http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth.fit
或者从 mllib 导入 fpgrowth
from pyspark.mllib.fpm import FPGrowth
编辑:
您可以通过两种方式继续。
1.Using rdd 方法
直接从文档中获取,
from pyspark.mllib.fpm import FPGrowth
txt = sc.textFile("step3.basket").map(lambda line: line.split(","))
#your txt is already a rdd
#No need to collect it and parallelize again
model = FPGrowth.train(txt, minSupport=0.2, numPartitions=10) #change parameters according to need
#model is ready
2.Using 数据框(我建议这是更好的方法)
from pyspark.ml.fpm import FPGrowth
df = sc.textFile("step3.basket").map(lambda line: (line.split(","),))
.toDF('items')
fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
model = fp.fit(df) #model is ready!
我正在尝试从文件中读取数据(项目以逗号分隔)并使用 PySpark 将此数据传递给 FPGrowth 算法。
到目前为止我的代码如下:
import pyspark
from pyspark import SparkContext
sc = SparkContext("local", "Assoc Rules", pyFiles=[])
txt = sc.textFile("step3.basket")
data = txt.map(lambda line: line.split(",")).collect()
rdd = sc.parallelize(data, 2)
from pyspark.ml.fpm import FPGrowth
fpg = FPGrowth(minSupport=0.02, minConfidence=0.6)
model = fpg.fit(rdd)
但是当我尝试 运行 代码时,出现以下错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-3-d34039dccad5> in <module>()
2
3 fpg = FPGrowth(minSupport=0.02, minConfidence=0.6)
----> 4 model = fpg.fit(rdd)
~/local/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
62 return self.copy(params)._fit(dataset)
63 else:
---> 64 return self._fit(dataset)
65 else:
66 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
~/local/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
263
264 def _fit(self, dataset):
--> 265 java_model = self._fit_java(dataset)
266 return self._create_model(java_model)
267
~/local/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
260 """
261 self._transfer_params_to_java()
--> 262 return self._java_obj.fit(dataset._jdf)
263
264 def _fit(self, dataset):
AttributeError: 'RDD' 对象没有属性 '_jdf'
我做错了什么,我该如何纠正?
来自 pyspark.ml.fpm 的 FPGrowth 采用 pyspark 数据帧,而不是 rdd。将rdd转换成dataframe然后通过。检查 http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth.fit
或者从 mllib 导入 fpgrowth
from pyspark.mllib.fpm import FPGrowth
编辑: 您可以通过两种方式继续。
1.Using rdd 方法
直接从文档中获取,
from pyspark.mllib.fpm import FPGrowth
txt = sc.textFile("step3.basket").map(lambda line: line.split(","))
#your txt is already a rdd
#No need to collect it and parallelize again
model = FPGrowth.train(txt, minSupport=0.2, numPartitions=10) #change parameters according to need
#model is ready
2.Using 数据框(我建议这是更好的方法)
from pyspark.ml.fpm import FPGrowth
df = sc.textFile("step3.basket").map(lambda line: (line.split(","),))
.toDF('items')
fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
model = fp.fit(df) #model is ready!