过滤嵌套 JSON 结构并获取字段名称作为 Pyspark 中的值
Filter nested JSON structure and get field names as values in Pyspark
我有以下想要在 PySpark 中解析的复杂数据:
records = '[{"segmentMembership":{"ups":{"FF6KCPTR6AQ0836R":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"exited"},"QMS3YRT06JDEUM8O":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"realized"},"8XH45RT87N6ZV4KQ":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"exited"}}},"_aepgdcdevenablement2":{"emailId":{"address":"stuff@someemail.com"},"person":{"name":{"firstName":"Name2"}},"identities":{"customerid":"PH25PEUWOTA7QF93"}}},{"segmentMembership":{"ups":{"FF6KCPTR6AQ0836R":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"realized"},"D45TOO8ZUH0B7GY7":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"realized"},"QMS3YRT06JDEUM8O":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"existing"}}},"_aepgdcdevenablement2":{"emailId":{"address":"stuff4@someemail.com"},"person":{"name":{"firstName":"TestName"}},"identities":{"customerid":"9LAIHVG91GCREE3Z"}}}]'
df = spark.read.json(sc.parallelize([records]))
df.show()
df.printSchema()
我遇到的问题是 segmentMembership
对象。 JSON 对象如下所示:
"segmentMembership": {
"ups": {
"FF6KCPTR6AQ0836R": {
"lastQualificationTime": "2021-01-16 22:05:11.074357",
"status": "exited"
},
"QMS3YRT06JDEUM8O": {
"lastQualificationTime": "2021-01-16 22:05:11.074357",
"status": "realized"
},
"8XH45RT87N6ZV4KQ": {
"lastQualificationTime": "2021-01-16 22:05:11.074357",
"status": "exited"
}
}
}
令人讨厌的是,键值 ("FF6KCPTR6AQ0836R", "QMS3YRT06JDEUM8O", "8XH45RT87N6ZV4KQ")
最终被定义为 pyspark 中的一列。
最后,如果段的状态是“已退出”,我希望得到如下结果。
+--------------------+----------------+---------+------------------+
|address |customerid |firstName|segment_id |
+--------------------+----------------+---------+------------------+
|stuff@someemail.com |PH25PEUWOTA7QF93|Name2 |[8XH45RT87N6ZV4KQ]|
|stuff4@someemail.com|9LAIHVG91GCREE3Z|TestName |[8XH45RT87N6ZV4KQ]|
+--------------------+----------------+---------+------------------+
将数据加载到数据帧后(如上),我尝试了以下操作:
dfx = df.select("_aepgdcdevenablement2.emailId.address", "_aepgdcdevenablement2.identities.customerid", "_aepgdcdevenablement2.person.name.firstName", "segmentMembership.ups")
dfx.show(truncate=False)
seg_list = array(*[lit(k) for k in ["8XH45RT87N6ZV4KQ", "QMS3YRT06JDEUM8O"]])
print(seg_list)
# if v["status"] in ['existing', 'realized']
def confusing_compare(ups, seg_list):
seg_id_filtered_d = dict((k, ups[k]) for k in seg_list if k in ups)
# This is the line I am having a problem with.
# seg_id_status_filtered_d = {key for key, value in seg_id_filtered_d.items() if v["status"] in ['existing', 'realized']}
return list(seg_id_filtered_d)
final_conf_dx_pred = udf(confusing_compare, ArrayType(StringType()))
result_df = dfx.withColumn("segment_id", final_conf_dx_pred(dfx.ups, seg_list)).select("address", "customerid", "firstName", "segment_id")
result_df.show(truncate=False)
我无法检查 dic 值字段中的状态字段。
您实际上可以在不使用 UDF 的情况下做到这一点。在这里,我使用模式中存在的所有段名称并过滤掉带有 status = 'exited'
的那些。您可以根据需要的细分和状态对其进行调整。
首先,使用架构字段,获取所有段名称的列表,如下所示:
segment_names = df.select("segmentMembership.ups.*").schema.fieldNames()
然后,通过遍历上面创建的列表并使用 when
函数,您可以创建一个列,该列可以具有 segment_name
作为值或 null,具体取决于 status
:
active_segments = [
when(col(f"segmentMembership.ups.{c}.status") != lit("exited"), lit(c))
for c in segment_names
]
最后,添加数组类型的新列segments
并使用filter
函数从数组中删除空元素(对应状态'exited'
):
dfx = df.withColumn("segments", array(*active_segments)) \
.withColumn("segments", expr("filter(segments, x -> x is not null)")) \
.select(
col("_aepgdcdevenablement2.emailId.address"),
col("_aepgdcdevenablement2.identities.customerid"),
col("_aepgdcdevenablement2.person.name.firstName"),
col("segments").alias("segment_id")
)
dfx.show(truncate=False)
#+--------------------+----------------+---------+------------------------------------------------------+
#|address |customerid |firstName|segment_id |
#+--------------------+----------------+---------+------------------------------------------------------+
#|stuff@someemail.com |PH25PEUWOTA7QF93|Name2 |[QMS3YRT06JDEUM8O] |
#|stuff4@someemail.com|9LAIHVG91GCREE3Z|TestName |[D45TOO8ZUH0B7GY7, FF6KCPTR6AQ0836R, QMS3YRT06JDEUM8O]|
#+--------------------+----------------+---------+------------------------------------------------------+
我有以下想要在 PySpark 中解析的复杂数据:
records = '[{"segmentMembership":{"ups":{"FF6KCPTR6AQ0836R":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"exited"},"QMS3YRT06JDEUM8O":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"realized"},"8XH45RT87N6ZV4KQ":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"exited"}}},"_aepgdcdevenablement2":{"emailId":{"address":"stuff@someemail.com"},"person":{"name":{"firstName":"Name2"}},"identities":{"customerid":"PH25PEUWOTA7QF93"}}},{"segmentMembership":{"ups":{"FF6KCPTR6AQ0836R":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"realized"},"D45TOO8ZUH0B7GY7":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"realized"},"QMS3YRT06JDEUM8O":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"existing"}}},"_aepgdcdevenablement2":{"emailId":{"address":"stuff4@someemail.com"},"person":{"name":{"firstName":"TestName"}},"identities":{"customerid":"9LAIHVG91GCREE3Z"}}}]'
df = spark.read.json(sc.parallelize([records]))
df.show()
df.printSchema()
我遇到的问题是 segmentMembership
对象。 JSON 对象如下所示:
"segmentMembership": {
"ups": {
"FF6KCPTR6AQ0836R": {
"lastQualificationTime": "2021-01-16 22:05:11.074357",
"status": "exited"
},
"QMS3YRT06JDEUM8O": {
"lastQualificationTime": "2021-01-16 22:05:11.074357",
"status": "realized"
},
"8XH45RT87N6ZV4KQ": {
"lastQualificationTime": "2021-01-16 22:05:11.074357",
"status": "exited"
}
}
}
令人讨厌的是,键值 ("FF6KCPTR6AQ0836R", "QMS3YRT06JDEUM8O", "8XH45RT87N6ZV4KQ")
最终被定义为 pyspark 中的一列。
最后,如果段的状态是“已退出”,我希望得到如下结果。
+--------------------+----------------+---------+------------------+
|address |customerid |firstName|segment_id |
+--------------------+----------------+---------+------------------+
|stuff@someemail.com |PH25PEUWOTA7QF93|Name2 |[8XH45RT87N6ZV4KQ]|
|stuff4@someemail.com|9LAIHVG91GCREE3Z|TestName |[8XH45RT87N6ZV4KQ]|
+--------------------+----------------+---------+------------------+
将数据加载到数据帧后(如上),我尝试了以下操作:
dfx = df.select("_aepgdcdevenablement2.emailId.address", "_aepgdcdevenablement2.identities.customerid", "_aepgdcdevenablement2.person.name.firstName", "segmentMembership.ups")
dfx.show(truncate=False)
seg_list = array(*[lit(k) for k in ["8XH45RT87N6ZV4KQ", "QMS3YRT06JDEUM8O"]])
print(seg_list)
# if v["status"] in ['existing', 'realized']
def confusing_compare(ups, seg_list):
seg_id_filtered_d = dict((k, ups[k]) for k in seg_list if k in ups)
# This is the line I am having a problem with.
# seg_id_status_filtered_d = {key for key, value in seg_id_filtered_d.items() if v["status"] in ['existing', 'realized']}
return list(seg_id_filtered_d)
final_conf_dx_pred = udf(confusing_compare, ArrayType(StringType()))
result_df = dfx.withColumn("segment_id", final_conf_dx_pred(dfx.ups, seg_list)).select("address", "customerid", "firstName", "segment_id")
result_df.show(truncate=False)
我无法检查 dic 值字段中的状态字段。
您实际上可以在不使用 UDF 的情况下做到这一点。在这里,我使用模式中存在的所有段名称并过滤掉带有 status = 'exited'
的那些。您可以根据需要的细分和状态对其进行调整。
首先,使用架构字段,获取所有段名称的列表,如下所示:
segment_names = df.select("segmentMembership.ups.*").schema.fieldNames()
然后,通过遍历上面创建的列表并使用 when
函数,您可以创建一个列,该列可以具有 segment_name
作为值或 null,具体取决于 status
:
active_segments = [
when(col(f"segmentMembership.ups.{c}.status") != lit("exited"), lit(c))
for c in segment_names
]
最后,添加数组类型的新列segments
并使用filter
函数从数组中删除空元素(对应状态'exited'
):
dfx = df.withColumn("segments", array(*active_segments)) \
.withColumn("segments", expr("filter(segments, x -> x is not null)")) \
.select(
col("_aepgdcdevenablement2.emailId.address"),
col("_aepgdcdevenablement2.identities.customerid"),
col("_aepgdcdevenablement2.person.name.firstName"),
col("segments").alias("segment_id")
)
dfx.show(truncate=False)
#+--------------------+----------------+---------+------------------------------------------------------+
#|address |customerid |firstName|segment_id |
#+--------------------+----------------+---------+------------------------------------------------------+
#|stuff@someemail.com |PH25PEUWOTA7QF93|Name2 |[QMS3YRT06JDEUM8O] |
#|stuff4@someemail.com|9LAIHVG91GCREE3Z|TestName |[D45TOO8ZUH0B7GY7, FF6KCPTR6AQ0836R, QMS3YRT06JDEUM8O]|
#+--------------------+----------------+---------+------------------------------------------------------+