访问 pyspark 数据框中数组的字段
access fields of an array within pyspark dataframe
我正在开发 sql 对基于一组 ORC 文件的 spark 数据框的查询。程序是这样的:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("test").getOrCreate()
sdf = spark_session.read.orc("../data/")
sdf.createOrReplaceTempView("test")
现在我有一个名为 "test" 的 table。如果我这样做:
spark_session.sql("select count(*) from test")
那么结果就好了。但是我需要在查询中获取更多的列,包括数组中的一些字段。
In [8]: sdf.take(1)[0]["person"]
Out[8]:
[Row(name='name', value='tom'),
Row(name='age', value='20'),
Row(name='gender', value='m')]
我试过类似的方法:
spark_session.sql("select person.age, count(*) from test group by person.age")
但这不起作用。我的问题是:如何访问 "person" 数组中的字段?
谢谢!
编辑:
sdf.printSchema()
的结果
In [3]: sdf.printSchema()
root
|-- person: integer (nullable = true)
|-- customtags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- value: string (nullable = true)
错误信息:
AnalysisException: 'No such struct field age in name, value; line 16 pos 8'
我不知道如何只使用 PySpark-SQL 来做到这一点,但这里有一种使用 PySpark DataFrames 来做到这一点的方法。
基本上,我们可以使用 create_map()
函数将结构列转换为 MapType()
。然后我们可以使用字符串索引直接访问字段。
考虑以下示例:
定义模式
schema = StructType([
StructField('person', IntegerType()),
StructField(
'customtags',
ArrayType(
StructType(
[
StructField('name', StringType()),
StructField('value', StringType())
]
)
)
)
]
)
创建示例 DataFrame
data = [
(
1,
[
{'name': 'name', 'value': 'tom'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'm'}
]
),
(
2,
[
{'name': 'name', 'value': 'jerry'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'm'}
]
),
(
3,
[
{'name': 'name', 'value': 'ann'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'f'}
]
)
]
df = sqlCtx.createDataFrame(data, schema)
df.show(truncate=False)
#+------+------------------------------------+
#|person|customtags |
#+------+------------------------------------+
#|1 |[[name,tom], [age,20], [gender,m]] |
#|2 |[[name,jerry], [age,20], [gender,m]]|
#|3 |[[name,ann], [age,20], [gender,f]] |
#+------+------------------------------------+
将结构列转换为映射
from operator import add
import pyspark.sql.functions as f
df = df.withColumn(
'customtags',
f.create_map(
*reduce(
add,
[
[f.col('customtags')['name'][i],
f.col('customtags')['value'][i]] for i in range(3)
]
)
)
)\
.select('person', 'customtags')
df.show(truncate=False)
#+------+------------------------------------------+
#|person|customtags |
#+------+------------------------------------------+
#|1 |Map(name -> tom, age -> 20, gender -> m) |
#|2 |Map(name -> jerry, age -> 20, gender -> m)|
#|3 |Map(name -> ann, age -> 20, gender -> f) |
#+------+------------------------------------------+
这里的问题是你必须事先知道 ArrayType()
的长度(在本例中为 3),因为我不知道动态循环它的方法。这还假定数组的所有行的长度都相同。
我不得不在这里使用 reduce(add, ...)
,因为 create_map()
需要 (key, value)
形式的元素对。
按地图列中的字段分组
df.groupBy((f.col('customtags')['name']).alias('name')).count().show()
#+-----+-----+
#| name|count|
#+-----+-----+
#| ann| 1|
#|jerry| 1|
#| tom| 1|
#+-----+-----+
df.groupBy((f.col('customtags')['gender']).alias('gender')).count().show()
#+------+-----+
#|gender|count|
#+------+-----+
#| m| 2|
#| f| 1|
#+------+-----+
我正在开发 sql 对基于一组 ORC 文件的 spark 数据框的查询。程序是这样的:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("test").getOrCreate()
sdf = spark_session.read.orc("../data/")
sdf.createOrReplaceTempView("test")
现在我有一个名为 "test" 的 table。如果我这样做:
spark_session.sql("select count(*) from test")
那么结果就好了。但是我需要在查询中获取更多的列,包括数组中的一些字段。
In [8]: sdf.take(1)[0]["person"]
Out[8]:
[Row(name='name', value='tom'),
Row(name='age', value='20'),
Row(name='gender', value='m')]
我试过类似的方法:
spark_session.sql("select person.age, count(*) from test group by person.age")
但这不起作用。我的问题是:如何访问 "person" 数组中的字段?
谢谢!
编辑:
sdf.printSchema()
的结果In [3]: sdf.printSchema()
root
|-- person: integer (nullable = true)
|-- customtags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- value: string (nullable = true)
错误信息:
AnalysisException: 'No such struct field age in name, value; line 16 pos 8'
我不知道如何只使用 PySpark-SQL 来做到这一点,但这里有一种使用 PySpark DataFrames 来做到这一点的方法。
基本上,我们可以使用 create_map()
函数将结构列转换为 MapType()
。然后我们可以使用字符串索引直接访问字段。
考虑以下示例:
定义模式
schema = StructType([
StructField('person', IntegerType()),
StructField(
'customtags',
ArrayType(
StructType(
[
StructField('name', StringType()),
StructField('value', StringType())
]
)
)
)
]
)
创建示例 DataFrame
data = [
(
1,
[
{'name': 'name', 'value': 'tom'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'm'}
]
),
(
2,
[
{'name': 'name', 'value': 'jerry'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'm'}
]
),
(
3,
[
{'name': 'name', 'value': 'ann'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'f'}
]
)
]
df = sqlCtx.createDataFrame(data, schema)
df.show(truncate=False)
#+------+------------------------------------+
#|person|customtags |
#+------+------------------------------------+
#|1 |[[name,tom], [age,20], [gender,m]] |
#|2 |[[name,jerry], [age,20], [gender,m]]|
#|3 |[[name,ann], [age,20], [gender,f]] |
#+------+------------------------------------+
将结构列转换为映射
from operator import add
import pyspark.sql.functions as f
df = df.withColumn(
'customtags',
f.create_map(
*reduce(
add,
[
[f.col('customtags')['name'][i],
f.col('customtags')['value'][i]] for i in range(3)
]
)
)
)\
.select('person', 'customtags')
df.show(truncate=False)
#+------+------------------------------------------+
#|person|customtags |
#+------+------------------------------------------+
#|1 |Map(name -> tom, age -> 20, gender -> m) |
#|2 |Map(name -> jerry, age -> 20, gender -> m)|
#|3 |Map(name -> ann, age -> 20, gender -> f) |
#+------+------------------------------------------+
这里的问题是你必须事先知道 ArrayType()
的长度(在本例中为 3),因为我不知道动态循环它的方法。这还假定数组的所有行的长度都相同。
我不得不在这里使用 reduce(add, ...)
,因为 create_map()
需要 (key, value)
形式的元素对。
按地图列中的字段分组
df.groupBy((f.col('customtags')['name']).alias('name')).count().show()
#+-----+-----+
#| name|count|
#+-----+-----+
#| ann| 1|
#|jerry| 1|
#| tom| 1|
#+-----+-----+
df.groupBy((f.col('customtags')['gender']).alias('gender')).count().show()
#+------+-----+
#|gender|count|
#+------+-----+
#| m| 2|
#| f| 1|
#+------+-----+