在pyspark中,如何通过一列数据框循环过滤功能?
In pyspark, how to loop filter function through a column of data frame?
这是我的数据:
**name** **movie**
jason a
jason b
jason c
mike a
mike b
bruce a
bruce c
ryan b
我的目标是做到这一点
**name** **# of moive**
jason a,b,c
mike a,b
bruce a,c
ryan b
我正在使用 pyspark 并尝试使用 UDF 来做这个工作人员。我定义了这个函数,spark 给了我一个错误,因为它调用了基本函数 'filter',这使得启动一个新的 worker 出现问题(如果没有,请纠正我)。
我的逻辑是先用过滤器做子集,然后行数就是电影数。然后我用这个 UDF 创建了一个新专栏。
def udf(user_name):
return df.filter(df['name'] == user_name).select('movie').dropDuplictes()\
.toPandas['movie'].tolist()
df.withColumn('movie_number', udf(df['name']))
但它不起作用。有没有办法用基本的 spark 函数制作 UDF?
所以我把名字列做成一个列表,然后循环遍历列表,但是超级慢我相信我没有做分布式计算。
1) 我的首要任务是弄清楚如何使用 spark_df.filter
.
等基本功能循环访问 pyspark 数据帧的一列中的信息
2) 可不可以先把name列做成一个RDD,然后用我的UDF循环遍历那个RDD,这样就可以利用分布式计算了?
3) 如果我有 2 个具有相同结构 (name/movie) 的 table,但年份不同,例如 2005 年和 2007 年,我们能否有一种有效的方法来制作第三个 table 其结构是:
**name** **movie** **in_2005** **in_2007**
jason a 1 0
jason b 0 1
jason c 1 1
mike a 0 1
mike b 1 0
bruce a 0 0
bruce c 1 1
ryan b 1 0
1 和 0 表示此人是否在 2005/2007 年对电影发表评论。在这种情况下,原来的 tables 将是:
2005:
**name** **movie**
jason a
jason c
mike b
bruce c
ryan b
2007
**name** **movie**
jason b
jason c
mike a
bruce c
我的想法是将 2 个 table 与一个 'year' 列连接在一起,并使用一个主元 table 来获得所需的结构。
我建议使用 groupby
后跟 collect_list
而不是将整个数据帧转换为 RDD。之后可以申请UDF。
import pyspark.sql.functions as func
# toy example dataframe
ls = [
['jason', 'movie_1'],
['jason', 'movie_2'],
['jason', 'movie_3'],
['mike', 'movie_1'],
['mike', 'movie_2'],
['bruce', 'movie_1'],
['bruce', 'movie_3'],
['ryan', 'movie_2']
]
df = spark.createDataFrame(pd.DataFrame(ls, columns=['name', 'movie']))
df_movie = df.groupby('name').agg(func.collect_list(func.col('movie')))
现在,这是创建 udf
来处理新列 movies
的示例。我简单举个例子,计算每一行的长度。
def movie_len(movies):
return len(movies)
udf_movie_len = func.udf(movie_len, returnType=StringType())
df_movie.select('name', 'movies', udf_movie_len(func.col('movies')).alias('n_movies')).show()
这将得到:
+-----+--------------------+--------+
| name| movies|n_movies|
+-----+--------------------+--------+
|jason|[movie_1, movie_2...| 3|
| ryan| [movie_2]| 1|
|bruce| [movie_1, movie_3]| 2|
| mike| [movie_1, movie_2]| 2|
+-----+--------------------+--------+
这是我的数据:
**name** **movie**
jason a
jason b
jason c
mike a
mike b
bruce a
bruce c
ryan b
我的目标是做到这一点
**name** **# of moive**
jason a,b,c
mike a,b
bruce a,c
ryan b
我正在使用 pyspark 并尝试使用 UDF 来做这个工作人员。我定义了这个函数,spark 给了我一个错误,因为它调用了基本函数 'filter',这使得启动一个新的 worker 出现问题(如果没有,请纠正我)。
我的逻辑是先用过滤器做子集,然后行数就是电影数。然后我用这个 UDF 创建了一个新专栏。
def udf(user_name):
return df.filter(df['name'] == user_name).select('movie').dropDuplictes()\
.toPandas['movie'].tolist()
df.withColumn('movie_number', udf(df['name']))
但它不起作用。有没有办法用基本的 spark 函数制作 UDF?
所以我把名字列做成一个列表,然后循环遍历列表,但是超级慢我相信我没有做分布式计算。
1) 我的首要任务是弄清楚如何使用 spark_df.filter
.
2) 可不可以先把name列做成一个RDD,然后用我的UDF循环遍历那个RDD,这样就可以利用分布式计算了?
3) 如果我有 2 个具有相同结构 (name/movie) 的 table,但年份不同,例如 2005 年和 2007 年,我们能否有一种有效的方法来制作第三个 table 其结构是:
**name** **movie** **in_2005** **in_2007**
jason a 1 0
jason b 0 1
jason c 1 1
mike a 0 1
mike b 1 0
bruce a 0 0
bruce c 1 1
ryan b 1 0
1 和 0 表示此人是否在 2005/2007 年对电影发表评论。在这种情况下,原来的 tables 将是:
2005:
**name** **movie**
jason a
jason c
mike b
bruce c
ryan b
2007
**name** **movie**
jason b
jason c
mike a
bruce c
我的想法是将 2 个 table 与一个 'year' 列连接在一起,并使用一个主元 table 来获得所需的结构。
我建议使用 groupby
后跟 collect_list
而不是将整个数据帧转换为 RDD。之后可以申请UDF。
import pyspark.sql.functions as func
# toy example dataframe
ls = [
['jason', 'movie_1'],
['jason', 'movie_2'],
['jason', 'movie_3'],
['mike', 'movie_1'],
['mike', 'movie_2'],
['bruce', 'movie_1'],
['bruce', 'movie_3'],
['ryan', 'movie_2']
]
df = spark.createDataFrame(pd.DataFrame(ls, columns=['name', 'movie']))
df_movie = df.groupby('name').agg(func.collect_list(func.col('movie')))
现在,这是创建 udf
来处理新列 movies
的示例。我简单举个例子,计算每一行的长度。
def movie_len(movies):
return len(movies)
udf_movie_len = func.udf(movie_len, returnType=StringType())
df_movie.select('name', 'movies', udf_movie_len(func.col('movies')).alias('n_movies')).show()
这将得到:
+-----+--------------------+--------+
| name| movies|n_movies|
+-----+--------------------+--------+
|jason|[movie_1, movie_2...| 3|
| ryan| [movie_2]| 1|
|bruce| [movie_1, movie_3]| 2|
| mike| [movie_1, movie_2]| 2|
+-----+--------------------+--------+