我如何使用聚合的输出作为 withColumn 的输入

How can i use output of an aggregation as input to withColumn

我正在尝试使用包含字符串、时间戳、整数和浮点数的 pyspark 数据帧来分析一些数据。

玩具 df:

sdf1 = 
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+


sdf2 = 
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+

我正在努力实现以下目标

agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]

sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))

这将导致以下数据帧。 然而,这不起作用 - 有解决办法吗?

sdf3 = 
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+

我收到以下错误:

AssertionError Traceback (most recent call last) in () 7 agg_instructions = [F.max(x).alias("{0}".format(x)) for x in data_sdf.columns] 8 ----> 9 sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions)) 10 11 test = test.reset_index()

/databricks/spark/python/pyspark/sql/dataframe.py in withColumn(self, colName, col) 2011 2012 """ -> 2013 assert isinstance(col, Column), "col should be Column" 2014 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx) 2015

AssertionError: col should be Column

这对于您想要实现的目标来说太多了。您可以仅从 sdf1 获得所需的输出。

一种方法是创建一个数组列,其中包含列名结构及其对应的最大值。然后分解它和 select 结构字段。

这是一个例子:

data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])

agg_instructions = array(
        *[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
    )

df.agg(agg_instructions.alias("agg")) \
  .withColumn("agg", explode(col("agg"))) \
  .select("agg.*") \
  .show()

#+---+---+
#|ids|max|
#+---+---+
#|id1|3  |
#|id2|a  |
#|id3|9  |
#+---+---+