PySpark 将行合并到列 StackOverFlow 错误
PySpark Combine Rows to Columns StackOverFlow Error
我想要的(非常简单):
Input Dataset to Output dataset
我试过的一些代码:
def add_columns(cur_typ, target, value):
if cur_typ == target:
return value
return None
schema = T.StructType([T.StructField("name", T.StringType(), True),
T.StructField("typeT", T.StringType(), True),
T.StructField("value", T.IntegerType(), True)])
data = [("x", "a", 3), ("x", "b", 5), ("x", "c", 7), ("y", "a", 1), ("y", "b", 2),
("y", "c", 4), ("z", "a", 6), ("z", "b", 2), ("z", "c", 3)]
df = ctx.spark_session.createDataFrame(ctx.spark_session.sparkContext.parallelize(data), schema)
targets = [i.typeT for i in df.select("typeT").distinct().collect()]
add_columns = F.udf(add_columns)
w = Window.partitionBy('name')
for target in targets:
df = df.withColumn(target, F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w))
df = df.drop("typeT", "value").dropDuplicates()
另一个版本:
targets = df.select(F.collect_set("typeT").alias("typeT")).first()["typeT"]
w = Window.partitionBy('name')
for target in targets:
df = df.withColumn(target, F.max(F.lit(F.when(veh["typeT"] == F.lit(target), veh["value"])
.otherwise(None)).over(w)))
df = df.drop("typeT", "value").dropDuplicates()
对于小型数据集,两者都有效,但我有一个包含 100 万行和 5000 种不同 typeT 的数据框。
所以结果应该是 table 大约 500 x 5000(有些名字没有特定的 typeT。
现在我收到 Whosebug 错误(py4j.protocol.Py4JJavaError:调用 o7624.withColumn 时发生错误。
: java.lang.WhosebugError) 试图创建这个数据框。除了增加堆栈大小,我还能做什么?有没有更好的方法来获得相同的结果?
在循环中使用withColumn
不好,如果没有要添加的列更多。
创建一个列数组,然后 select 它们,这将带来更好的性能
cols = [F.col("name")]
for target in targets:
cols.append(F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w).alias(target))
df = df.select(cols)
结果相同
+----+---+---+---+
|name| c| b| a|
+----+---+---+---+
| x| 7| 5| 3|
| z| 3| 2| 6|
| y| 4| 2| 1|
+----+---+---+---+
我想要的(非常简单):
Input Dataset to Output dataset
我试过的一些代码:
def add_columns(cur_typ, target, value):
if cur_typ == target:
return value
return None
schema = T.StructType([T.StructField("name", T.StringType(), True),
T.StructField("typeT", T.StringType(), True),
T.StructField("value", T.IntegerType(), True)])
data = [("x", "a", 3), ("x", "b", 5), ("x", "c", 7), ("y", "a", 1), ("y", "b", 2),
("y", "c", 4), ("z", "a", 6), ("z", "b", 2), ("z", "c", 3)]
df = ctx.spark_session.createDataFrame(ctx.spark_session.sparkContext.parallelize(data), schema)
targets = [i.typeT for i in df.select("typeT").distinct().collect()]
add_columns = F.udf(add_columns)
w = Window.partitionBy('name')
for target in targets:
df = df.withColumn(target, F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w))
df = df.drop("typeT", "value").dropDuplicates()
另一个版本:
targets = df.select(F.collect_set("typeT").alias("typeT")).first()["typeT"]
w = Window.partitionBy('name')
for target in targets:
df = df.withColumn(target, F.max(F.lit(F.when(veh["typeT"] == F.lit(target), veh["value"])
.otherwise(None)).over(w)))
df = df.drop("typeT", "value").dropDuplicates()
对于小型数据集,两者都有效,但我有一个包含 100 万行和 5000 种不同 typeT 的数据框。 所以结果应该是 table 大约 500 x 5000(有些名字没有特定的 typeT。 现在我收到 Whosebug 错误(py4j.protocol.Py4JJavaError:调用 o7624.withColumn 时发生错误。 : java.lang.WhosebugError) 试图创建这个数据框。除了增加堆栈大小,我还能做什么?有没有更好的方法来获得相同的结果?
在循环中使用withColumn
不好,如果没有要添加的列更多。
创建一个列数组,然后 select 它们,这将带来更好的性能
cols = [F.col("name")]
for target in targets:
cols.append(F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w).alias(target))
df = df.select(cols)
结果相同
+----+---+---+---+
|name| c| b| a|
+----+---+---+---+
| x| 7| 5| 3|
| z| 3| 2| 6|
| y| 4| 2| 1|
+----+---+---+---+