我如何使用聚合的输出作为 withColumn 的输入
How can i use output of an aggregation as input to withColumn
我正在尝试使用包含字符串、时间戳、整数和浮点数的 pyspark 数据帧来分析一些数据。
玩具 df:
sdf1 =
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+
sdf2 =
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+
我正在努力实现以下目标
agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]
sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))
这将导致以下数据帧。
然而,这不起作用 - 有解决办法吗?
sdf3 =
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+
我收到以下错误:
AssertionError Traceback (most recent call
last) in ()
7 agg_instructions = [F.max(x).alias("{0}".format(x)) for x in data_sdf.columns]
8
----> 9 sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))
10
11 test = test.reset_index()
/databricks/spark/python/pyspark/sql/dataframe.py in withColumn(self,
colName, col) 2011 2012 """
-> 2013 assert isinstance(col, Column), "col should be Column" 2014 return DataFrame(self._jdf.withColumn(colName, col._jc),
self.sql_ctx) 2015
AssertionError: col should be Column
这对于您想要实现的目标来说太多了。您可以仅从 sdf1
获得所需的输出。
一种方法是创建一个数组列,其中包含列名结构及其对应的最大值。然后分解它和 select 结构字段。
这是一个例子:
data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])
agg_instructions = array(
*[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
)
df.agg(agg_instructions.alias("agg")) \
.withColumn("agg", explode(col("agg"))) \
.select("agg.*") \
.show()
#+---+---+
#|ids|max|
#+---+---+
#|id1|3 |
#|id2|a |
#|id3|9 |
#+---+---+
我正在尝试使用包含字符串、时间戳、整数和浮点数的 pyspark 数据帧来分析一些数据。
玩具 df:
sdf1 =
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+
sdf2 =
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+
我正在努力实现以下目标
agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]
sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))
这将导致以下数据帧。 然而,这不起作用 - 有解决办法吗?
sdf3 =
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+
我收到以下错误:
AssertionError Traceback (most recent call last) in () 7 agg_instructions = [F.max(x).alias("{0}".format(x)) for x in data_sdf.columns] 8 ----> 9 sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions)) 10 11 test = test.reset_index()
/databricks/spark/python/pyspark/sql/dataframe.py in withColumn(self, colName, col) 2011 2012 """ -> 2013 assert isinstance(col, Column), "col should be Column" 2014 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx) 2015
AssertionError: col should be Column
这对于您想要实现的目标来说太多了。您可以仅从 sdf1
获得所需的输出。
一种方法是创建一个数组列,其中包含列名结构及其对应的最大值。然后分解它和 select 结构字段。
这是一个例子:
data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])
agg_instructions = array(
*[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
)
df.agg(agg_instructions.alias("agg")) \
.withColumn("agg", explode(col("agg"))) \
.select("agg.*") \
.show()
#+---+---+
#|ids|max|
#+---+---+
#|id1|3 |
#|id2|a |
#|id3|9 |
#+---+---+