如何在spark中将多列内爆成一个结构

How to implode multiple columns into one struct in spark

我有一个具有以下架构的 spark 数据框:

我想将多个列组合成一个结构,这样生成的架构就变成了:

我想把它变成这样的格式,以便它适合作为 kafka 输入。 请告诉如何实现这一点。

您可以使用 struct 对项目进行分组。

res26.show()
+-------+------+-----+------------+------------+
|Account|Amount|order|    meteric1|    meteric2|
+-------+------+-----+------------+------------+
| 643100| 10000|    1|           0|           0|
| 234100|  4000|    2|  8589934592|  8589934592|
| 124562| 20000|    9| 17179869184| 17179869184|
| 234567|  5000|   10| 17179869185| 17179869185|
| 643304| 40000|    8| 25769803776| 25769803776|
| 124562| 20000|    9| 34359738368| 34359738368|
| 234567|  5000|   10| 34359738369| 34359738369|
| 643304| 40000|    8| 42949672960| 42949672960|
| 643100| 10000|    1| 51539607552| 51539607552|
| 234100|  4000|    2| 60129542144| 60129542144|
| 231300|  1000|    3| 68719476736| 68719476736|
| 136400|  5000|    4| 77309411328| 77309411328|
| 643841| 20000|    5| 77309411329| 77309411329|
| 432176| 10000|    7| 85899345920| 85899345920|
| 562100| 10000|    6| 94489280512| 94489280512|
| 432176| 10000|    7|103079215104|103079215104|
| 562100| 10000|    6|111669149696|111669149696|
| 231300|  1000|    3|120259084288|120259084288|
| 136400|  5000|    4|128849018880|128849018880|
| 643841| 20000|    5|128849018881|128849018881|
+-------+------+-----+------------+------------+


res26.select( res26("Account"),res26("Amount"), struct( col("order"), col("meteric1"),col("meteric2")).as("Value") ).show(true)
+-------+------+--------------------+
|Account|Amount|               Value|
+-------+------+--------------------+
| 643100| 10000|           [1, 0, 0]|
| 234100|  4000|[2, 8589934592, 8...|
| 124562| 20000|[9, 17179869184, ...|
| 234567|  5000|[10, 17179869185,...|
| 643304| 40000|[8, 25769803776, ...|
| 124562| 20000|[9, 34359738368, ...|
| 234567|  5000|[10, 34359738369,...|
| 643304| 40000|[8, 42949672960, ...|
| 643100| 10000|[1, 51539607552, ...|
| 234100|  4000|[2, 60129542144, ...|
| 231300|  1000|[3, 68719476736, ...|
| 136400|  5000|[4, 77309411328, ...|
| 643841| 20000|[5, 77309411329, ...|
| 432176| 10000|[7, 85899345920, ...|
| 562100| 10000|[6, 94489280512, ...|
| 432176| 10000|[7, 103079215104,...|
| 562100| 10000|[6, 111669149696,...|
| 231300|  1000|[3, 120259084288,...|
| 136400|  5000|[4, 128849018880,...|
| 643841| 20000|[5, 128849018881,...|
+-------+------+--------------------+

在 Pyspark 中:使用 struct

的最小演示

从数据列表创建 Spark 数据帧

data = [('head1', 'id1', 'timestamp1'), ('head2', 'id2', 'timestamp2'), ('head03', 'id3', 'timestamp3')]
df = spark.createDataFrame(data, ['headers', 'id', 'timestamp'])
df.show()
# +-------+---+----------+                                                        
# |headers| id| timestamp|
# +-------+---+----------+
# |  head1|id1|timestamp1|
# |  head2|id2|timestamp2|
# | head03|id3|timestamp3|
# +-------+---+----------+

# pretty-print dataframe schema  
df.printSchema()
# root
#  |-- headers: string (nullable = true)
#  |-- id: string (nullable = true)
#  |-- timestamp: string (nullable = true)

使用struct整理一列下一个结构中的多列

from pyspark.sql.functions import struct
df1 = df.select('headers', struct('id', 'timestamp').alias('value'))
df1.show()
# +-------+-----------------+
# |headers|            value|
# +-------+-----------------+
# |  head1|{id1, timestamp1}|
# |  head2|{id2, timestamp2}|
# | head03|{id3, timestamp3}|
# +-------+-----------------+
df1.printSchema()
# root
#  |-- headers: string (nullable = true)
#  |-- value: struct (nullable = false)
#  |    |-- id: string (nullable = true)
#  |    |-- timestamp: string (nullable = true)