Pyspark 合并列
Pyspark merge columns
数据框如下
+---+---+---+---+
| a| a| c| c|
+---+---+---+---+
| z| z1| a| z|
| b| z2| a| c|
| c| z3| b| a|
| d| z4| a| a|
+---+---+---+---+
我需要像这样合并公共列预期输出
+---+---+
| a| c|
+---+---+
| z | a |
| z | z |
| b | a |
| b | c |
| c | b |
| c | a |
| d | a |
| d | a |
| z1| a |
| z1| z |
| z2| a |
| z2| c |
| z3| b |
| z3| a |
| z4| a |
| z4| a |
+---+---+
我试过这个代码
old_col=df.schema.names
running_list=[]
new_col=[]
i=0
for column in old_col:
if(column in running_list):
new_col.append(column+"_"+str(i))
i=i+1
else:
new_col.append(column)
running_list.append(column)
print(new_col)
df1 = df.toDF(*new_col)
它只删除重复数据而不合并数据
您可以通过以下方式做到这一点
from pyspark.sql import types, functions
schema = (types.StructType([
types.StructField('a', types.StringType()),
types.StructField('c', types.StringType()),
]))
df = df.toDF(*["a","b","c","d"])
df2 = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
for c1 in ["a", "b"]:
for c2 in ["c", "d"]:
df2=df2.union(df.select(c1,c2).withColumnRenamed(c1,"a").withColumnRenamed(c2,"c"))
得到如下结果
df2.show()
+---+---+
| a| c|
+---+---+
| z| a|
| b| a|
| c| b|
| d| a|
| z| z|
| b| c|
| c| a|
| d| a|
| z1| a|
| z2| a|
| z3| b|
| z4| a|
| z1| z|
| z2| c|
| z3| a|
| z4| a|
+---+---+
数据框如下
+---+---+---+---+
| a| a| c| c|
+---+---+---+---+
| z| z1| a| z|
| b| z2| a| c|
| c| z3| b| a|
| d| z4| a| a|
+---+---+---+---+
我需要像这样合并公共列预期输出
+---+---+
| a| c|
+---+---+
| z | a |
| z | z |
| b | a |
| b | c |
| c | b |
| c | a |
| d | a |
| d | a |
| z1| a |
| z1| z |
| z2| a |
| z2| c |
| z3| b |
| z3| a |
| z4| a |
| z4| a |
+---+---+
我试过这个代码
old_col=df.schema.names
running_list=[]
new_col=[]
i=0
for column in old_col:
if(column in running_list):
new_col.append(column+"_"+str(i))
i=i+1
else:
new_col.append(column)
running_list.append(column)
print(new_col)
df1 = df.toDF(*new_col)
它只删除重复数据而不合并数据
您可以通过以下方式做到这一点
from pyspark.sql import types, functions
schema = (types.StructType([
types.StructField('a', types.StringType()),
types.StructField('c', types.StringType()),
]))
df = df.toDF(*["a","b","c","d"])
df2 = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
for c1 in ["a", "b"]:
for c2 in ["c", "d"]:
df2=df2.union(df.select(c1,c2).withColumnRenamed(c1,"a").withColumnRenamed(c2,"c"))
得到如下结果
df2.show()
+---+---+
| a| c|
+---+---+
| z| a|
| b| a|
| c| b|
| d| a|
| z| z|
| b| c|
| c| a|
| d| a|
| z1| a|
| z2| a|
| z3| b|
| z4| a|
| z1| z|
| z2| c|
| z3| a|
| z4| a|
+---+---+