对使用 Pyspark mllib 的用户子集的推荐 ALS/MatrixFactorizationModel
Recommendation for subset of users using Pyspark mllib ALS/MatrixFactorizationModel
我使用以下代码构建了一个模型:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model1 = ALS.train(ratings=ratingsR, rank=model_params['rank'], \
iterations=model_params['iterations'], lambda_=model_params['lambda'], \
blocks=model_params['blocks'], nonnegative=model_params['nonnegative'], \
seed=model_params['seed'])
现在我想使用 spark 提供的分布式环境为所有用户(或部分用户)预测活动。
我尝试了 recommendProductsForUsers
花了很长时间才让我获得 3200 万用户 X 4000 产品。
preds = model1.recommendProductsForUsers(num=4000)
我真的不需要为所有 3200 万用户推荐。我对 100k-200k 用户也很好。
所以为了修改我的流程,我尝试了spark udf的方式为每个用户一个一个地处理,但是使用了spark集群的分布机制:
import pyspark.sql.functions as F
def udf_preds(sameModel):
return F.udf(lambda x: get_predictions(x, sameModel))
def get_predictions(x, sameModel):
preds = sameModel.recommendProducts(user=x, num=4000) # per user it takes around 4s
return preds
test.withColumn('predictions', udf_preds(model1)(F.col('user_id')))
测试包含大约 200,000 名用户。
以上失败并出现以下错误:
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
如何更好地为部分用户执行推荐?
(编辑)
来自@piscall 的回复。我尝试使用 RDD 做同样的事情。
preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))
preds_rdd.take(2)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 330, in __getnewargs__
"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
PicklingErrorTraceback (most recent call last)
<ipython-input-17-e114800a26e7> in <module>()
----> 1 preds_rdd.take(2)
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in take(self, num)
1356
1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
1359
1360 items += res
/usr/hdp/current/spark2-client/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
1038 # SparkContext#runJob.
1039 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1040 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1041 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
1042
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in _jrdd(self)
2470
2471 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2472 self._jrdd_deserializer, profiler)
2473 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2474 self.preservesPartitioning)
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2403 assert serializer, "serializer should not be empty"
2404 command = (func, profiler, deserializer, serializer)
-> 2405 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2406 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2407 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
2389 # the serialized command will be compressed by broadcast
2390 ser = CloudPickleSerializer()
-> 2391 pickled_command = ser.dumps(command)
2392 if len(pickled_command) > (1 << 20): # 1M
2393 # The broadcast will have same life cycle as created PythonRDD
/usr/hdp/current/spark2-client/python/pyspark/serializers.py in dumps(self, obj)
573
574 def dumps(self, obj):
--> 575 return cloudpickle.dumps(obj, 2)
576
577
/usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dumps(obj, protocol)
916
917 cp = CloudPickler(file,protocol)
--> 918 cp.dump(obj)
919
920 return file.getvalue()
/u
I have built a model using the below code:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
sr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dump(self, obj)
247 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
248 print_exec(sys.stderr)
--> 249 raise pickle.PicklingError(msg)
250
251
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
preds = sameModel.predict(x)
怎么样,我不知道 spark 甚至 scala,但这就是我们在 python 中的做法,我想 spark 也是如此。如果你想用 x 的子样本进行预测,那么你可以做类似的事情:preds = sameModel.predict(x[0:200,::])
或 vise verse for the columns.
您可以尝试以下方法:
"""
Convert the test df to rdd, and then map a function that returns
(user_id, array(Rating(user, product, rating)))
"""
preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))
# Convert (user_id, array(Rating(user, product, rating))) to
# (user_id, array(product_names))
preds_rdd2 = preds_rdd.map(lambda row: row[0], [x.product for x in row[1])
# Convert above RDD to DF with user_id and predicted_products columns
preds_df = preds_rdd2.toDF(["user_id", "predicted_products"])
我没有测试过,但直接来自 here:
的文档
然后您可以选择将其连接回原始数据框,或保留列。
然后,如果需要,您可以使用 explode()
将产品数组分解为不同的行。
我会做的是使用predictAll方法。假设 df_products 是包含所有 4,000 种产品的数据框,而 df_users 是包含 100-200K 个选定用户的数据框,然后进行 crossJoin 以找到两个数据集的所有组合以形成测试数据,然后使用 predictAll 将产生 4000 种产品中选定用户的评级对象:
from pyspark.sql.functions import broadcast
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
testdata = broadcast(df_products).crossJoin(df_users).select('user', 'product').rdd.map(tuple)
model.predictAll(testdata).collect()
使用 documentation 中的示例,其中有 4 个产品 (1,2,3,4) 和 4 个用户 (1,2,3,4):
df_products.collect()
# [Row(product=1), Row(product=3), Row(product=2), Row(product=4)]
# a subset of all users:
df_users.collect()
# [Row(user=1), Row(user=3)]
testdata.collect()
# [(1, 1), (1, 3), (1, 2), (1, 4), (3, 1), (3, 3), (3, 2), (3, 4)]
model.predictAll(testdata).collect()
#[Rating(user=1, product=4, rating=0.9999459747142155),
# Rating(user=3, product=4, rating=4.99555263974573),
# Rating(user=1, product=1, rating=4.996821463543848),
# Rating(user=3, product=1, rating=1.000199620693615),
# Rating(user=1, product=3, rating=4.996821463543848),
# Rating(user=3, product=3, rating=1.000199620693615),
# Rating(user=1, product=2, rating=0.9999459747142155),
# Rating(user=3, product=2, rating=4.99555263974573)]
注意:您可能希望在创建testdata
之前筛选出不在现有模型中的用户并单独处理。
我使用以下代码构建了一个模型:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model1 = ALS.train(ratings=ratingsR, rank=model_params['rank'], \
iterations=model_params['iterations'], lambda_=model_params['lambda'], \
blocks=model_params['blocks'], nonnegative=model_params['nonnegative'], \
seed=model_params['seed'])
现在我想使用 spark 提供的分布式环境为所有用户(或部分用户)预测活动。
我尝试了 recommendProductsForUsers
花了很长时间才让我获得 3200 万用户 X 4000 产品。
preds = model1.recommendProductsForUsers(num=4000)
我真的不需要为所有 3200 万用户推荐。我对 100k-200k 用户也很好。
所以为了修改我的流程,我尝试了spark udf的方式为每个用户一个一个地处理,但是使用了spark集群的分布机制:
import pyspark.sql.functions as F
def udf_preds(sameModel):
return F.udf(lambda x: get_predictions(x, sameModel))
def get_predictions(x, sameModel):
preds = sameModel.recommendProducts(user=x, num=4000) # per user it takes around 4s
return preds
test.withColumn('predictions', udf_preds(model1)(F.col('user_id')))
测试包含大约 200,000 名用户。 以上失败并出现以下错误:
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
如何更好地为部分用户执行推荐?
(编辑)
来自@piscall 的回复。我尝试使用 RDD 做同样的事情。
preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))
preds_rdd.take(2)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 330, in __getnewargs__
"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
PicklingErrorTraceback (most recent call last)
<ipython-input-17-e114800a26e7> in <module>()
----> 1 preds_rdd.take(2)
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in take(self, num)
1356
1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
1359
1360 items += res
/usr/hdp/current/spark2-client/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
1038 # SparkContext#runJob.
1039 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1040 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1041 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
1042
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in _jrdd(self)
2470
2471 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2472 self._jrdd_deserializer, profiler)
2473 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2474 self.preservesPartitioning)
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2403 assert serializer, "serializer should not be empty"
2404 command = (func, profiler, deserializer, serializer)
-> 2405 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2406 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2407 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/usr/hdp/current/spark2-client/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
2389 # the serialized command will be compressed by broadcast
2390 ser = CloudPickleSerializer()
-> 2391 pickled_command = ser.dumps(command)
2392 if len(pickled_command) > (1 << 20): # 1M
2393 # The broadcast will have same life cycle as created PythonRDD
/usr/hdp/current/spark2-client/python/pyspark/serializers.py in dumps(self, obj)
573
574 def dumps(self, obj):
--> 575 return cloudpickle.dumps(obj, 2)
576
577
/usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dumps(obj, protocol)
916
917 cp = CloudPickler(file,protocol)
--> 918 cp.dump(obj)
919
920 return file.getvalue()
/u
I have built a model using the below code:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
sr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dump(self, obj)
247 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
248 print_exec(sys.stderr)
--> 249 raise pickle.PicklingError(msg)
250
251
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
preds = sameModel.predict(x)
怎么样,我不知道 spark 甚至 scala,但这就是我们在 python 中的做法,我想 spark 也是如此。如果你想用 x 的子样本进行预测,那么你可以做类似的事情:preds = sameModel.predict(x[0:200,::])
或 vise verse for the columns.
您可以尝试以下方法:
"""
Convert the test df to rdd, and then map a function that returns
(user_id, array(Rating(user, product, rating)))
"""
preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))
# Convert (user_id, array(Rating(user, product, rating))) to
# (user_id, array(product_names))
preds_rdd2 = preds_rdd.map(lambda row: row[0], [x.product for x in row[1])
# Convert above RDD to DF with user_id and predicted_products columns
preds_df = preds_rdd2.toDF(["user_id", "predicted_products"])
我没有测试过,但直接来自 here:
的文档然后您可以选择将其连接回原始数据框,或保留列。
然后,如果需要,您可以使用 explode()
将产品数组分解为不同的行。
我会做的是使用predictAll方法。假设 df_products 是包含所有 4,000 种产品的数据框,而 df_users 是包含 100-200K 个选定用户的数据框,然后进行 crossJoin 以找到两个数据集的所有组合以形成测试数据,然后使用 predictAll 将产生 4000 种产品中选定用户的评级对象:
from pyspark.sql.functions import broadcast
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
testdata = broadcast(df_products).crossJoin(df_users).select('user', 'product').rdd.map(tuple)
model.predictAll(testdata).collect()
使用 documentation 中的示例,其中有 4 个产品 (1,2,3,4) 和 4 个用户 (1,2,3,4):
df_products.collect()
# [Row(product=1), Row(product=3), Row(product=2), Row(product=4)]
# a subset of all users:
df_users.collect()
# [Row(user=1), Row(user=3)]
testdata.collect()
# [(1, 1), (1, 3), (1, 2), (1, 4), (3, 1), (3, 3), (3, 2), (3, 4)]
model.predictAll(testdata).collect()
#[Rating(user=1, product=4, rating=0.9999459747142155),
# Rating(user=3, product=4, rating=4.99555263974573),
# Rating(user=1, product=1, rating=4.996821463543848),
# Rating(user=3, product=1, rating=1.000199620693615),
# Rating(user=1, product=3, rating=4.996821463543848),
# Rating(user=3, product=3, rating=1.000199620693615),
# Rating(user=1, product=2, rating=0.9999459747142155),
# Rating(user=3, product=2, rating=4.99555263974573)]
注意:您可能希望在创建testdata
之前筛选出不在现有模型中的用户并单独处理。