如何在spark中将多列内爆成一个结构
How to implode multiple columns into one struct in spark
我有一个具有以下架构的 spark 数据框:
- headers
- 关键
- 编号
- 时间戳
- metricVal1
- metricVal2
我想将多个列组合成一个结构,这样生成的架构就变成了:
- headers (col)
- 键(列)
- 值(结构)
- 编号(列)
- 时间戳(列)
- metricVal1 (col)
- metricVal2 (col)
我想把它变成这样的格式,以便它适合作为 kafka 输入。
请告诉如何实现这一点。
您可以使用 struct
对项目进行分组。
res26.show()
+-------+------+-----+------------+------------+
|Account|Amount|order| meteric1| meteric2|
+-------+------+-----+------------+------------+
| 643100| 10000| 1| 0| 0|
| 234100| 4000| 2| 8589934592| 8589934592|
| 124562| 20000| 9| 17179869184| 17179869184|
| 234567| 5000| 10| 17179869185| 17179869185|
| 643304| 40000| 8| 25769803776| 25769803776|
| 124562| 20000| 9| 34359738368| 34359738368|
| 234567| 5000| 10| 34359738369| 34359738369|
| 643304| 40000| 8| 42949672960| 42949672960|
| 643100| 10000| 1| 51539607552| 51539607552|
| 234100| 4000| 2| 60129542144| 60129542144|
| 231300| 1000| 3| 68719476736| 68719476736|
| 136400| 5000| 4| 77309411328| 77309411328|
| 643841| 20000| 5| 77309411329| 77309411329|
| 432176| 10000| 7| 85899345920| 85899345920|
| 562100| 10000| 6| 94489280512| 94489280512|
| 432176| 10000| 7|103079215104|103079215104|
| 562100| 10000| 6|111669149696|111669149696|
| 231300| 1000| 3|120259084288|120259084288|
| 136400| 5000| 4|128849018880|128849018880|
| 643841| 20000| 5|128849018881|128849018881|
+-------+------+-----+------------+------------+
res26.select( res26("Account"),res26("Amount"), struct( col("order"), col("meteric1"),col("meteric2")).as("Value") ).show(true)
+-------+------+--------------------+
|Account|Amount| Value|
+-------+------+--------------------+
| 643100| 10000| [1, 0, 0]|
| 234100| 4000|[2, 8589934592, 8...|
| 124562| 20000|[9, 17179869184, ...|
| 234567| 5000|[10, 17179869185,...|
| 643304| 40000|[8, 25769803776, ...|
| 124562| 20000|[9, 34359738368, ...|
| 234567| 5000|[10, 34359738369,...|
| 643304| 40000|[8, 42949672960, ...|
| 643100| 10000|[1, 51539607552, ...|
| 234100| 4000|[2, 60129542144, ...|
| 231300| 1000|[3, 68719476736, ...|
| 136400| 5000|[4, 77309411328, ...|
| 643841| 20000|[5, 77309411329, ...|
| 432176| 10000|[7, 85899345920, ...|
| 562100| 10000|[6, 94489280512, ...|
| 432176| 10000|[7, 103079215104,...|
| 562100| 10000|[6, 111669149696,...|
| 231300| 1000|[3, 120259084288,...|
| 136400| 5000|[4, 128849018880,...|
| 643841| 20000|[5, 128849018881,...|
+-------+------+--------------------+
在 Pyspark 中:使用 struct
的最小演示
从数据列表创建 Spark 数据帧
data = [('head1', 'id1', 'timestamp1'), ('head2', 'id2', 'timestamp2'), ('head03', 'id3', 'timestamp3')]
df = spark.createDataFrame(data, ['headers', 'id', 'timestamp'])
df.show()
# +-------+---+----------+
# |headers| id| timestamp|
# +-------+---+----------+
# | head1|id1|timestamp1|
# | head2|id2|timestamp2|
# | head03|id3|timestamp3|
# +-------+---+----------+
# pretty-print dataframe schema
df.printSchema()
# root
# |-- headers: string (nullable = true)
# |-- id: string (nullable = true)
# |-- timestamp: string (nullable = true)
使用struct
整理一列下一个结构中的多列
from pyspark.sql.functions import struct
df1 = df.select('headers', struct('id', 'timestamp').alias('value'))
df1.show()
# +-------+-----------------+
# |headers| value|
# +-------+-----------------+
# | head1|{id1, timestamp1}|
# | head2|{id2, timestamp2}|
# | head03|{id3, timestamp3}|
# +-------+-----------------+
df1.printSchema()
# root
# |-- headers: string (nullable = true)
# |-- value: struct (nullable = false)
# | |-- id: string (nullable = true)
# | |-- timestamp: string (nullable = true)
我有一个具有以下架构的 spark 数据框:
- headers
- 关键
- 编号
- 时间戳
- metricVal1
- metricVal2
我想将多个列组合成一个结构,这样生成的架构就变成了:
- headers (col)
- 键(列)
- 值(结构)
- 编号(列)
- 时间戳(列)
- metricVal1 (col)
- metricVal2 (col)
我想把它变成这样的格式,以便它适合作为 kafka 输入。 请告诉如何实现这一点。
您可以使用 struct
对项目进行分组。
res26.show()
+-------+------+-----+------------+------------+
|Account|Amount|order| meteric1| meteric2|
+-------+------+-----+------------+------------+
| 643100| 10000| 1| 0| 0|
| 234100| 4000| 2| 8589934592| 8589934592|
| 124562| 20000| 9| 17179869184| 17179869184|
| 234567| 5000| 10| 17179869185| 17179869185|
| 643304| 40000| 8| 25769803776| 25769803776|
| 124562| 20000| 9| 34359738368| 34359738368|
| 234567| 5000| 10| 34359738369| 34359738369|
| 643304| 40000| 8| 42949672960| 42949672960|
| 643100| 10000| 1| 51539607552| 51539607552|
| 234100| 4000| 2| 60129542144| 60129542144|
| 231300| 1000| 3| 68719476736| 68719476736|
| 136400| 5000| 4| 77309411328| 77309411328|
| 643841| 20000| 5| 77309411329| 77309411329|
| 432176| 10000| 7| 85899345920| 85899345920|
| 562100| 10000| 6| 94489280512| 94489280512|
| 432176| 10000| 7|103079215104|103079215104|
| 562100| 10000| 6|111669149696|111669149696|
| 231300| 1000| 3|120259084288|120259084288|
| 136400| 5000| 4|128849018880|128849018880|
| 643841| 20000| 5|128849018881|128849018881|
+-------+------+-----+------------+------------+
res26.select( res26("Account"),res26("Amount"), struct( col("order"), col("meteric1"),col("meteric2")).as("Value") ).show(true)
+-------+------+--------------------+
|Account|Amount| Value|
+-------+------+--------------------+
| 643100| 10000| [1, 0, 0]|
| 234100| 4000|[2, 8589934592, 8...|
| 124562| 20000|[9, 17179869184, ...|
| 234567| 5000|[10, 17179869185,...|
| 643304| 40000|[8, 25769803776, ...|
| 124562| 20000|[9, 34359738368, ...|
| 234567| 5000|[10, 34359738369,...|
| 643304| 40000|[8, 42949672960, ...|
| 643100| 10000|[1, 51539607552, ...|
| 234100| 4000|[2, 60129542144, ...|
| 231300| 1000|[3, 68719476736, ...|
| 136400| 5000|[4, 77309411328, ...|
| 643841| 20000|[5, 77309411329, ...|
| 432176| 10000|[7, 85899345920, ...|
| 562100| 10000|[6, 94489280512, ...|
| 432176| 10000|[7, 103079215104,...|
| 562100| 10000|[6, 111669149696,...|
| 231300| 1000|[3, 120259084288,...|
| 136400| 5000|[4, 128849018880,...|
| 643841| 20000|[5, 128849018881,...|
+-------+------+--------------------+
在 Pyspark 中:使用 struct
从数据列表创建 Spark 数据帧
data = [('head1', 'id1', 'timestamp1'), ('head2', 'id2', 'timestamp2'), ('head03', 'id3', 'timestamp3')]
df = spark.createDataFrame(data, ['headers', 'id', 'timestamp'])
df.show()
# +-------+---+----------+
# |headers| id| timestamp|
# +-------+---+----------+
# | head1|id1|timestamp1|
# | head2|id2|timestamp2|
# | head03|id3|timestamp3|
# +-------+---+----------+
# pretty-print dataframe schema
df.printSchema()
# root
# |-- headers: string (nullable = true)
# |-- id: string (nullable = true)
# |-- timestamp: string (nullable = true)
使用struct
整理一列下一个结构中的多列
from pyspark.sql.functions import struct
df1 = df.select('headers', struct('id', 'timestamp').alias('value'))
df1.show()
# +-------+-----------------+
# |headers| value|
# +-------+-----------------+
# | head1|{id1, timestamp1}|
# | head2|{id2, timestamp2}|
# | head03|{id3, timestamp3}|
# +-------+-----------------+
df1.printSchema()
# root
# |-- headers: string (nullable = true)
# |-- value: struct (nullable = false)
# | |-- id: string (nullable = true)
# | |-- timestamp: string (nullable = true)