在pyspark中按列稀疏到密集阵列
Sparse by column to dense array in pyspark
我有两个数据框,我需要从中获取信息以生成第三个。第一个数据框包含用户对项目迭代的信息,例如
+-----+-----------+-----------+
|user | itemId |date |
+-----+-----------+-----------+
|1 | 10005880 |2019-07-23 |
|2 | 10005903 |2019-07-23 |
|3 | 10005903 |2019-07-23 |
|1 | 12458442 |2019-07-23 |
|1 | 10005903 |2019-07-26 |
|3 | 12632813 |2019-07-26 |
|2 | 12632813 |2019-07-26 |
+-----+-----------+-----------+
没有特定的顺序,每个用户有多行。
第二个数据框只是一个带有索引的项目列表,例如,
+-----------+-----------+
| itemId |index |
+-----------+-----------+
| 10005880 |1 |
| 10005903 |2 |
| 12458442 |3 |
| ... | ... |
| 12632813 |2000000 |
+-----------+-----------+
这个数据框很长,并不是每个项目都在项目交互数据框中表示。需要的是第三个数据框,其中每一行都包含用户项目交互的矢量化表示,作为单列中的数组,例如
+-----+--------------------+
|user | interactions |
+-----+--------------------+
|1 | <1, 1, 1, ..., 0> |
|2 | <0, 1, 0, ..., 1> |
|3 | <0, 1, 0, ..., 1> |
+-----+--------------------+
如果用户与该索引处的项目交互,则数组的值为 1,否则为 0。在 pyspark 中是否有一种简单的方法可以做到这一点?
您可以加入 2 个 DataFrame,然后按 user
收集索引列表。
df_users_items = df_users.join(df_items, ["itemId"], "left")
df_user_interations = df_users_items.groupBy("user").agg(collect_set("index").alias("interactions"))
现在使用索引数组创建新数组 interactions
,如下所示:
max_index = df_items.select(max(col("index")).alias("max_index")).first().max_index
interactions_col = array(
*[when(array_contains("interactions", i + 1), lit(1)).otherwise(lit(0)) for i in range(max_index)])
df_user_interations.withColumn("interactions", interactions_col).show()
试试这个!如果需要,您还可以修改或进行任何更正。
from pyspark.sql.functions import col, when, arrays_zip
userIndexes = users.join(items, users.itemId == items.itemId, 'left').crosstab('user', 'index')
cols = userIndexes.columns.filter(_ != 'user')
userIndexes.select('user', arrays_zip([when(col(c).isNull(), lit(0)).otherwise(lit(1)) for c in cols]).alias('interactions')).show()
尽情享受吧!
更新:
设置 Spark 配置:
var sparkConf: SparkConf = null
sparkConf = new SparkConf()
.set("spark.sql.inMemoryColumnarStorage.batchSize", 36000)
IIUC,您可以使用pyspark.ml.feature.CountVectorizer来帮助创建所需的向量。假设 df1 是第一个数据框(用户、itemId 和日期),df2 是第二个数据框(itemId 和索引):
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import collect_set
df3 = df1.groupby('user').agg(collect_set('itemId').alias('items_arr'))
# set up the vocabulary from the 2nd dataframe and then create CountVectorizerModel from this list
# set binary=True so that this is doing the same as OneHotEncoder
voc = [ r.itemId for r in df2.select('itemId').sort('index').collect() ]
model = CountVectorizerModel.from_vocabulary(voc, inputCol='items_arr', outputCol='items_vec', binary=True)
df_new = model.transform(df3)
df_new.show(truncate=False)
+----+------------------------------+-------------------------+
|user|items_arr |items_vec |
+----+------------------------------+-------------------------+
|3 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |
|1 |[10005903, 12458442, 10005880]|(4,[0,1,3],[1.0,1.0,1.0])|
|2 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |
+----+------------------------------+-------------------------+
这将创建一个 SparseVector,如果你想要一个 ArrayType 列,你将需要一个 udf:
from pyspark.sql.functions import udf
udf_to_array = udf(lambda v: [*map(int, v.toArray())], 'array<int>')
df_new.withColumn('interactions', udf_to_array('items_vec')).show(truncate=False)
+----+------------------------------+-------------------------+------------+
|user|items_arr |items_vec |interactions|
+----+------------------------------+-------------------------+------------+
|3 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |[0, 1, 1, 0]|
|1 |[10005903, 12458442, 10005880]|(4,[0,1,3],[1.0,1.0,1.0])|[1, 1, 0, 1]|
|2 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |[0, 1, 1, 0]|
+----+------------------------------+-------------------------+------------+
我有两个数据框,我需要从中获取信息以生成第三个。第一个数据框包含用户对项目迭代的信息,例如
+-----+-----------+-----------+
|user | itemId |date |
+-----+-----------+-----------+
|1 | 10005880 |2019-07-23 |
|2 | 10005903 |2019-07-23 |
|3 | 10005903 |2019-07-23 |
|1 | 12458442 |2019-07-23 |
|1 | 10005903 |2019-07-26 |
|3 | 12632813 |2019-07-26 |
|2 | 12632813 |2019-07-26 |
+-----+-----------+-----------+
没有特定的顺序,每个用户有多行。 第二个数据框只是一个带有索引的项目列表,例如,
+-----------+-----------+
| itemId |index |
+-----------+-----------+
| 10005880 |1 |
| 10005903 |2 |
| 12458442 |3 |
| ... | ... |
| 12632813 |2000000 |
+-----------+-----------+
这个数据框很长,并不是每个项目都在项目交互数据框中表示。需要的是第三个数据框,其中每一行都包含用户项目交互的矢量化表示,作为单列中的数组,例如
+-----+--------------------+
|user | interactions |
+-----+--------------------+
|1 | <1, 1, 1, ..., 0> |
|2 | <0, 1, 0, ..., 1> |
|3 | <0, 1, 0, ..., 1> |
+-----+--------------------+
如果用户与该索引处的项目交互,则数组的值为 1,否则为 0。在 pyspark 中是否有一种简单的方法可以做到这一点?
您可以加入 2 个 DataFrame,然后按 user
收集索引列表。
df_users_items = df_users.join(df_items, ["itemId"], "left")
df_user_interations = df_users_items.groupBy("user").agg(collect_set("index").alias("interactions"))
现在使用索引数组创建新数组 interactions
,如下所示:
max_index = df_items.select(max(col("index")).alias("max_index")).first().max_index
interactions_col = array(
*[when(array_contains("interactions", i + 1), lit(1)).otherwise(lit(0)) for i in range(max_index)])
df_user_interations.withColumn("interactions", interactions_col).show()
试试这个!如果需要,您还可以修改或进行任何更正。
from pyspark.sql.functions import col, when, arrays_zip
userIndexes = users.join(items, users.itemId == items.itemId, 'left').crosstab('user', 'index')
cols = userIndexes.columns.filter(_ != 'user')
userIndexes.select('user', arrays_zip([when(col(c).isNull(), lit(0)).otherwise(lit(1)) for c in cols]).alias('interactions')).show()
尽情享受吧!
更新: 设置 Spark 配置:
var sparkConf: SparkConf = null
sparkConf = new SparkConf()
.set("spark.sql.inMemoryColumnarStorage.batchSize", 36000)
IIUC,您可以使用pyspark.ml.feature.CountVectorizer来帮助创建所需的向量。假设 df1 是第一个数据框(用户、itemId 和日期),df2 是第二个数据框(itemId 和索引):
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import collect_set
df3 = df1.groupby('user').agg(collect_set('itemId').alias('items_arr'))
# set up the vocabulary from the 2nd dataframe and then create CountVectorizerModel from this list
# set binary=True so that this is doing the same as OneHotEncoder
voc = [ r.itemId for r in df2.select('itemId').sort('index').collect() ]
model = CountVectorizerModel.from_vocabulary(voc, inputCol='items_arr', outputCol='items_vec', binary=True)
df_new = model.transform(df3)
df_new.show(truncate=False)
+----+------------------------------+-------------------------+
|user|items_arr |items_vec |
+----+------------------------------+-------------------------+
|3 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |
|1 |[10005903, 12458442, 10005880]|(4,[0,1,3],[1.0,1.0,1.0])|
|2 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |
+----+------------------------------+-------------------------+
这将创建一个 SparseVector,如果你想要一个 ArrayType 列,你将需要一个 udf:
from pyspark.sql.functions import udf
udf_to_array = udf(lambda v: [*map(int, v.toArray())], 'array<int>')
df_new.withColumn('interactions', udf_to_array('items_vec')).show(truncate=False)
+----+------------------------------+-------------------------+------------+
|user|items_arr |items_vec |interactions|
+----+------------------------------+-------------------------+------------+
|3 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |[0, 1, 1, 0]|
|1 |[10005903, 12458442, 10005880]|(4,[0,1,3],[1.0,1.0,1.0])|[1, 1, 0, 1]|
|2 |[10005903, 12632813] |(4,[1,2],[1.0,1.0]) |[0, 1, 1, 0]|
+----+------------------------------+-------------------------+------------+