PySpark UDF:冷杉变换示例
PySpark UDF: a fir transform example
我是 PySpark 的新手,正在尝试将一些 python 代码翻译成 pyspark。
我从熊猫开始,转换为文档 - 术语矩阵,然后应用 PCA。
UDF:
class MultiLabelCounter():
def __init__(self, classes=None):
self.classes_ = classes
def fit(self,y):
self.classes_ =
sorted(set(itertools.chain.from_iterable(y)))
self.mapping = dict(zip(self.classes_,
range(len(self.classes_))))
return self
def transform(self,y):
yt = []
for labels in y:
data = [0]*len(self.classes_)
for label in labels:
data[self.mapping[label]] +=1
yt.append(data)
return yt
def fit_transform(self,y):
return self.fit(y).transform(y)
mlb = MultiLabelCounter()
df_grouped =
df_grouped.withColumnRenamed("collect_list(full)","full")
udf_mlb = udf(lambda x: mlb.fit_transform(x),IntegerType())
mlb_fitted = df_grouped.withColumn('full',udf_mlb(col("full")))
我当然得到了 NULL 结果。
我使用的是spark 2.4.4版本
编辑
根据请求添加示例输入和输出
输入:
|id|val|
|--|---|
|1|[hello,world]|
|2|[goodbye, world]|
|3|[hello,hello]|
输出:
|id|hello|goodbye|world|
|--|-----|-------|-----|
|1|1|0|1|
|2|0|1|1|
|3|2|0|0|
根据共享的输入数据,我尝试复制您的输出并且成功了。请看下面-
输入数据
df = spark.createDataFrame(data=[(1, ['hello', 'world']), (2, ['goodbye', 'world']), (3, ['hello', 'hello'])], schema=['id', 'vals'])
df.show()
+---+----------------+
| id| vals|
+---+----------------+
| 1| [hello, world]|
| 2|[goodbye, world]|
| 3| [hello, hello]|
+---+----------------+
现在,使用 explode
从 vals
列表项中创建单独的行。此后,使用 pivot
和 count
将计算频率。最后,使用 fillna(0)
将 null
值替换为 0
。见下文-
from pyspark.sql.functions import *
df1 = df.select(['id', explode(col('vals'))]).groupBy("id").pivot("col").agg(count(col("col")))
df1.fillna(0).orderBy("id").show()
输出
+---+-------+-----+-----+
| id|goodbye|hello|world|
+---+-------+-----+-----+
| 1| 0| 1| 1|
| 2| 1| 0| 1|
| 3| 0| 2| 0|
+---+-------+-----+-----+
我是 PySpark 的新手,正在尝试将一些 python 代码翻译成 pyspark。 我从熊猫开始,转换为文档 - 术语矩阵,然后应用 PCA。
UDF:
class MultiLabelCounter():
def __init__(self, classes=None):
self.classes_ = classes
def fit(self,y):
self.classes_ =
sorted(set(itertools.chain.from_iterable(y)))
self.mapping = dict(zip(self.classes_,
range(len(self.classes_))))
return self
def transform(self,y):
yt = []
for labels in y:
data = [0]*len(self.classes_)
for label in labels:
data[self.mapping[label]] +=1
yt.append(data)
return yt
def fit_transform(self,y):
return self.fit(y).transform(y)
mlb = MultiLabelCounter()
df_grouped =
df_grouped.withColumnRenamed("collect_list(full)","full")
udf_mlb = udf(lambda x: mlb.fit_transform(x),IntegerType())
mlb_fitted = df_grouped.withColumn('full',udf_mlb(col("full")))
我当然得到了 NULL 结果。
我使用的是spark 2.4.4版本
编辑
根据请求添加示例输入和输出
输入:
|id|val|
|--|---|
|1|[hello,world]|
|2|[goodbye, world]|
|3|[hello,hello]|
输出:
|id|hello|goodbye|world|
|--|-----|-------|-----|
|1|1|0|1|
|2|0|1|1|
|3|2|0|0|
根据共享的输入数据,我尝试复制您的输出并且成功了。请看下面-
输入数据
df = spark.createDataFrame(data=[(1, ['hello', 'world']), (2, ['goodbye', 'world']), (3, ['hello', 'hello'])], schema=['id', 'vals'])
df.show()
+---+----------------+
| id| vals|
+---+----------------+
| 1| [hello, world]|
| 2|[goodbye, world]|
| 3| [hello, hello]|
+---+----------------+
现在,使用 explode
从 vals
列表项中创建单独的行。此后,使用 pivot
和 count
将计算频率。最后,使用 fillna(0)
将 null
值替换为 0
。见下文-
from pyspark.sql.functions import *
df1 = df.select(['id', explode(col('vals'))]).groupBy("id").pivot("col").agg(count(col("col")))
df1.fillna(0).orderBy("id").show()
输出
+---+-------+-----+-----+
| id|goodbye|hello|world|
+---+-------+-----+-----+
| 1| 0| 1| 1|
| 2| 1| 0| 1|
| 3| 0| 2| 0|
+---+-------+-----+-----+