Pyspark:连接列,其中名称在另一个列中给出

Pyspark : concat columns where the name is given in another one

我有 2 个数据帧

df1= 
+--------------+
|questions     |
+--------------+
|[Q1, Q2]      |
|[Q4, Q6, Q7]  |
|...           |
+---+----------+

df2 = 
+--------------------+---+---+---+---+
| Q1| Q2| Q3| Q4| Q6| Q7 | ...   |Q25|
+--------------------+---+---+---+---+
|  1|  0|  1|  0|  0| 1  | ...   |  1|
+--------------------+---+---+---+---+

我想在第一个数据框中添加一个新列,所有列的值都定义为 df1.questions

预期结果

df1 = 
+--------------++--------------+
|questions     |values
+--------------+---------------+
|[Q1, Q2]      |[1, 0]         |
|[Q4, Q6, Q7]  |[0, 0, 1]      |
|...           |               |
+---+----------++--------------+
 

当我做的时候

 cols_to_link = ['Q1', 'Q2']
 df2= df2.select([col for col in cols_to_link])\
 df2 = df2.withColumn('value', F.concat_ws(", ", *df2.columns))

附加列是我想要的,但我无法通过混合数据帧来实现

当我使用 df2 时它也有效

 df2 = df2.select([col for col in df1.select('questions').collect()[0][0]])\
 df2 = df2.withColumn('value', F.concat_ws(", ", *df2.columns))

但不是当我想从 df1 去的时候

 df1= df1\
    .withColumn('value', F.concat_ws(", ", *df2.select([col for col in df1.select('questions').collect()])))

我哪里错了?

来自我的示例数据框,

# df1
+------------+
|   questions|
+------------+
|    [Q1, Q2]|
|[Q4, Q6, Q7]|
+------------+

# df2
+---+---+---+---+---+---+
| Q1| Q2| Q3| Q4| Q6| Q7|
+---+---+---+---+---+---+
|  1|  0|  1|  0|  0|  1|
+---+---+---+---+---+---+

我已经创建了垂直数据框并加入。一般来说,您不能引用其他数据框中的列。

cols = df2.columns
df = df2.rdd.flatMap(lambda row: [[cols[i], row[i]] for i in range(0, len(row))]).toDF(['id', 'values'])
df.show()

+---+------+
| id|values|
+---+------+
| Q1|     1|
| Q2|     0|
| Q3|     1|
| Q4|     0|
| Q6|     0|
| Q7|     1|
+---+------+

df1.join(df, f.expr('array_contains(questions, id)'), 'left') \
  .groupBy('questions').agg(f.collect_list('values').alias('values')) \
  .show()

+------------+---------+
|   questions|   values|
+------------+---------+
|    [Q1, Q2]|   [1, 0]|
|[Q4, Q6, Q7]|[0, 0, 1]|
+------------+---------+