将结构数组重新排列为结构数组,其中字段作为 Pyspark 中的数组
Rearrange Array of Struct to Array of a Struct with a field as Array in Pyspark
我有一个 struct(nome,h_0,h_1,....h_23)
的“简单”数据帧数组,我想将此列重新排列为 struct(nome, array(h_0,h_1....h_23))
的数组
原样:
root
|-- array_area: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- nome: string (nullable = true)
| | |-- h_0: string (nullable = true)
| | |-- h_1: string (nullable = true)
| | |-- h_10: string (nullable = true)
| | |-- h_11: string (nullable = true)
| | |-- h_12: string (nullable = true)
| | |-- h_13: string (nullable = true)
| | |-- h_14: string (nullable = true)
| | |-- h_15: string (nullable = true)
| | |-- h_16: string (nullable = true)
| | |-- h_17: string (nullable = true)
| | |-- h_18: string (nullable = true)
| | |-- h_19: string (nullable = true)
| | |-- h_2: string (nullable = true)
| | |-- h_20: string (nullable = true)
| | |-- h_21: string (nullable = true)
| | |-- h_22: string (nullable = true)
| | |-- h_23: string (nullable = true)
| | |-- h_3: string (nullable = true)
| | |-- h_4: string (nullable = true)
| | |-- h_5: string (nullable = true)
| | |-- h_6: string (nullable = true)
| | |-- h_7: string (nullable = true)
| | |-- h_8: string (nullable = true)
| | |-- h_9: string (nullable = true)
我要:
root
|-- array_area: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- nome: string (nullable = true)
| | |-- circadiana: array (nullable = true)
| | | |--element: struct (containsNull = true)
| | | | |--h_0: string (nullable = true)
| | | | |--h_1: string (nullable = true)
| | | | |--h_2: string (nullable = true)
| | | | |--... until h_23
我使用过 UDF,例如:
concat_udf = F.udf(lambda arr: F.array(F.struct(x["nome"], F.array(x["h_0"],x["h_1"],x["h_2"],x["h_3"],x["h_4"],x["h_5"],x["h_6"],x["h_7"],x["h_8"],x["h_9"],x["h_10"],x["h_11"],x["h_12"],x["h_13"],x["h_14"],x["h_15"],x["h_16"],x["h_17"],x["h_18"],x["h_19"],x["h_20"],x["h_21"],x["h_22"],x["h_23"])) for x in arr),
ArrayType(StructType([StructField("nome", StringType(), True),StructField("circadiana", ArrayType(StringType()), True)])))
printSchema 没问题!
但是当我使用 show()
查看数据时
df_new=df.withColumn("area_provenienza_X",concat_udf(F.col("array_area"))).show()
我有这个错误:
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
return lambda *a: toInternal(f(*a))
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "LoadFileSIMO112_dati_aggregati.py", line 150, in <lambda>
x["h_23"])) for x in arr),
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1869, in array
jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'
示例数据:
"area": [{
"nome": "Extra",
"h_0": "0",
"h_1": "0",
"h_2": "0",
"h_3": "0",
"h_4": "0",
"h_5": "0",
"h_6": "1",
"h_7": "0",
"h_8": "0",
"h_9": "0",
"h_10": "1",
"h_11": "1",
"h_12": "0",
"h_13": "1",
"h_14": "0",
"h_15": "0",
"h_16": "0",
"h_17": "1",
"h_18": "0",
"h_19": "1",
"h_20": "0",
"h_21": "1",
"h_22": "0",
"h_23": "1"
},
{
"nome": "ROMA CAP",
"h_0": "130",
"h_1": "94",
"h_2": "116",
"h_3": "61",
"h_4": "54",
"h_5": "47",
"h_6": "58",
"h_7": "57",
"h_8": "87",
"h_9": "0",
"h_10": "0",
"h_11": "0",
"h_12": "0",
"h_13": "0",
"h_14": "0",
"h_15": "0",
"h_16": "0",
"h_17": "0",
"h_18": "0",
"h_19": "0",
"h_20": "0",
"h_21": "0",
"h_22": "0",
"h_23": "124"
}]
我要:
"area": [{
"nome": "Extra",
"circadiana":[0,0,0,0,0,0,1,0,0,0,1,1,0,1,0,0,0,1,0,1,0,1,0,1]
},
{
"nome": "ROMA CAP",
"circadiana":[130,94,116,61,54,47,58,87,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,124]
}]
但是我的数据框 (df) 中没有空值。
感谢帮助
此致
您可以使用 spark 内置的 transform
函数将数组的每个元素转换为所需的结构。
首先,获取结构中存在的所有 h_x
字段名称:
import pyspark.sql.functions as F
h_fields = [c for c in df.select(F.expr("inline(area)")).columns if c != "nome"]
然后,在 area
数组上使用 transform
函数,对于每个元素,我们创建一个包含 2 个字段的结构。第一个字段包含 nome
,第二个字段是使用所有其他字段创建的数组 circadiana
(h_0
...):
transform_expr = f"""
transform(area,
x -> struct(
x.nome as nome,
array({','.join([f'x.{c}' for c in h_fields])}) as circadiana
)
)
"""
df1 = df.withColumn("area", F.expr(transform_expr))
df1.printSchema()
#root
# |-- area: array (nullable = true)
# | |-- element: struct (containsNull = false)
# | | |-- nome: string (nullable = true)
# | | |-- circadiana: array (nullable = false)
# | | | |-- element: string (containsNull = true)
如果 h_x
字段列表是固定的,您实际上可以使用:
transform_expr = f"""
transform(area,
x -> struct(
x.nome as nome,
array({','.join([f'x.h_{i}' for i in range(24)])}) as circadiana
)
)
"""
我有一个 struct(nome,h_0,h_1,....h_23)
的“简单”数据帧数组,我想将此列重新排列为 struct(nome, array(h_0,h_1....h_23))
原样:
root
|-- array_area: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- nome: string (nullable = true)
| | |-- h_0: string (nullable = true)
| | |-- h_1: string (nullable = true)
| | |-- h_10: string (nullable = true)
| | |-- h_11: string (nullable = true)
| | |-- h_12: string (nullable = true)
| | |-- h_13: string (nullable = true)
| | |-- h_14: string (nullable = true)
| | |-- h_15: string (nullable = true)
| | |-- h_16: string (nullable = true)
| | |-- h_17: string (nullable = true)
| | |-- h_18: string (nullable = true)
| | |-- h_19: string (nullable = true)
| | |-- h_2: string (nullable = true)
| | |-- h_20: string (nullable = true)
| | |-- h_21: string (nullable = true)
| | |-- h_22: string (nullable = true)
| | |-- h_23: string (nullable = true)
| | |-- h_3: string (nullable = true)
| | |-- h_4: string (nullable = true)
| | |-- h_5: string (nullable = true)
| | |-- h_6: string (nullable = true)
| | |-- h_7: string (nullable = true)
| | |-- h_8: string (nullable = true)
| | |-- h_9: string (nullable = true)
我要:
root
|-- array_area: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- nome: string (nullable = true)
| | |-- circadiana: array (nullable = true)
| | | |--element: struct (containsNull = true)
| | | | |--h_0: string (nullable = true)
| | | | |--h_1: string (nullable = true)
| | | | |--h_2: string (nullable = true)
| | | | |--... until h_23
我使用过 UDF,例如:
concat_udf = F.udf(lambda arr: F.array(F.struct(x["nome"], F.array(x["h_0"],x["h_1"],x["h_2"],x["h_3"],x["h_4"],x["h_5"],x["h_6"],x["h_7"],x["h_8"],x["h_9"],x["h_10"],x["h_11"],x["h_12"],x["h_13"],x["h_14"],x["h_15"],x["h_16"],x["h_17"],x["h_18"],x["h_19"],x["h_20"],x["h_21"],x["h_22"],x["h_23"])) for x in arr),
ArrayType(StructType([StructField("nome", StringType(), True),StructField("circadiana", ArrayType(StringType()), True)])))
printSchema 没问题! 但是当我使用 show()
查看数据时df_new=df.withColumn("area_provenienza_X",concat_udf(F.col("array_area"))).show()
我有这个错误:
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
return lambda *a: toInternal(f(*a))
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "LoadFileSIMO112_dati_aggregati.py", line 150, in <lambda>
x["h_23"])) for x in arr),
File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1869, in array
jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'
示例数据:
"area": [{
"nome": "Extra",
"h_0": "0",
"h_1": "0",
"h_2": "0",
"h_3": "0",
"h_4": "0",
"h_5": "0",
"h_6": "1",
"h_7": "0",
"h_8": "0",
"h_9": "0",
"h_10": "1",
"h_11": "1",
"h_12": "0",
"h_13": "1",
"h_14": "0",
"h_15": "0",
"h_16": "0",
"h_17": "1",
"h_18": "0",
"h_19": "1",
"h_20": "0",
"h_21": "1",
"h_22": "0",
"h_23": "1"
},
{
"nome": "ROMA CAP",
"h_0": "130",
"h_1": "94",
"h_2": "116",
"h_3": "61",
"h_4": "54",
"h_5": "47",
"h_6": "58",
"h_7": "57",
"h_8": "87",
"h_9": "0",
"h_10": "0",
"h_11": "0",
"h_12": "0",
"h_13": "0",
"h_14": "0",
"h_15": "0",
"h_16": "0",
"h_17": "0",
"h_18": "0",
"h_19": "0",
"h_20": "0",
"h_21": "0",
"h_22": "0",
"h_23": "124"
}]
我要:
"area": [{
"nome": "Extra",
"circadiana":[0,0,0,0,0,0,1,0,0,0,1,1,0,1,0,0,0,1,0,1,0,1,0,1]
},
{
"nome": "ROMA CAP",
"circadiana":[130,94,116,61,54,47,58,87,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,124]
}]
但是我的数据框 (df) 中没有空值。 感谢帮助 此致
您可以使用 spark 内置的 transform
函数将数组的每个元素转换为所需的结构。
首先,获取结构中存在的所有 h_x
字段名称:
import pyspark.sql.functions as F
h_fields = [c for c in df.select(F.expr("inline(area)")).columns if c != "nome"]
然后,在 area
数组上使用 transform
函数,对于每个元素,我们创建一个包含 2 个字段的结构。第一个字段包含 nome
,第二个字段是使用所有其他字段创建的数组 circadiana
(h_0
...):
transform_expr = f"""
transform(area,
x -> struct(
x.nome as nome,
array({','.join([f'x.{c}' for c in h_fields])}) as circadiana
)
)
"""
df1 = df.withColumn("area", F.expr(transform_expr))
df1.printSchema()
#root
# |-- area: array (nullable = true)
# | |-- element: struct (containsNull = false)
# | | |-- nome: string (nullable = true)
# | | |-- circadiana: array (nullable = false)
# | | | |-- element: string (containsNull = true)
如果 h_x
字段列表是固定的,您实际上可以使用:
transform_expr = f"""
transform(area,
x -> struct(
x.nome as nome,
array({','.join([f'x.h_{i}' for i in range(24)])}) as circadiana
)
)
"""