使用 pyspark 从 JSON DF 数组中删除选择性 JSON
Remove selective JSON's from JSON DF array using pyspark
我想从 json 数组中删除多个 json,我在下面的源格式中有一个 json,如下所示。
我有一个列表,其中包含需要保存在 json 数组中的设备 ID 列表,其余的需要删除。
例如,如我的来源 json 所示,我有 3 dev_id 100010100 , 200020200 and 300030300
.
我有 python 列表 device_id_list=[200020200,300030300]
,我的最终 JSON 数组应该只包含 json 数组中的 2 json,json dev_id= 100010100 将被删除,如输出 json.
所示
我尝试了一个可能不是最佳的选项,我的方法是将 json 读取为字符串而不是 json,如下所示。
df = spark.read.text("path\iot-sensor.json")
df:pyspark.sql.dataframe.DataFrame
value:string
我写了一个 udf 来删除 json device_id_list
中不存在的那些。它正在删除不存在的 dev_id
并将 json 作为字符串返回。
我希望将此字符串即 dataframe df2
转换为 JSON ,具有相同的源 json 架构 (df2:pyspark.sql.dataframe.DataFrame = [iot_station: array] (Sorce Schema) )
因为源和输出的架构 json 应该是一样的,如果有更好的解决方案请分享。
UDF:
def drop_dev_id(jsonResponse,dict_keys):
try:
data = json.loads(jsonResponse)
i = 0
n = len(data['iot_station'])
while (i < n):
if data['iot_station'][i]["dev_id"] not in dict_keys:
data['iot_station'].pop(i)
n -= 1
else:
i += 1
return data
except Exception as e:
print('Exception --> ' + str(e))
def drop_dev_id_udf(dict_keys):
return udf(lambda row: drop_dev_id(row,dict_keys), StringType())
df2 = df.select('value',drop_dev_id_udf(dict_keys)('value')).select('<lambda>(value)')
df2:pyspark.sql.dataframe.DataFrame
<lambda>(value):string
来源JSON
{
"iot_station": [
{
"dev_id": 100010100,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
},
{
"dev_id": 200020200,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
},
{
"dev_id": 300030300,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
}
]
}
输出JSON:
{
"iot_station": [
{
"dev_id": 200020200,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
},
{
"dev_id": 300030300,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
}
]
}
您不需要 UDF 来实现您想要在此处实现的目标。只需正常加载它 JSON 而不是文本并使用 filter
函数过滤数组列 iot_station
:
from pyspark.sql import functions as F
df = spark.read.json("path/iot-sensor.json", multiLine=True)
device_id_list = [str(i) for i in [200020200, 300030300]]
df1 = df.withColumn(
"iot_station",
F.expr(f"""
filter(
iot_station,
x -> x.dev_id in ({','.join(device_id_list)})
)
""")
)
# check filtered json
df1.select(F.col("iot_station").getItem("dev_id").alias("dev_id")).show(truncate=False)
#+----------------------+
#|dev_id |
#+----------------------+
#|[200020200, 300030300]|
#+----------------------+
我想从 json 数组中删除多个 json,我在下面的源格式中有一个 json,如下所示。
我有一个列表,其中包含需要保存在 json 数组中的设备 ID 列表,其余的需要删除。
例如,如我的来源 json 所示,我有 3 dev_id 100010100 , 200020200 and 300030300
.
我有 python 列表 device_id_list=[200020200,300030300]
,我的最终 JSON 数组应该只包含 json 数组中的 2 json,json dev_id= 100010100 将被删除,如输出 json.
我尝试了一个可能不是最佳的选项,我的方法是将 json 读取为字符串而不是 json,如下所示。
df = spark.read.text("path\iot-sensor.json")
df:pyspark.sql.dataframe.DataFrame
value:string
我写了一个 udf 来删除 json device_id_list
中不存在的那些。它正在删除不存在的 dev_id
并将 json 作为字符串返回。
我希望将此字符串即 dataframe df2
转换为 JSON ,具有相同的源 json 架构 (df2:pyspark.sql.dataframe.DataFrame = [iot_station: array] (Sorce Schema) )
因为源和输出的架构 json 应该是一样的,如果有更好的解决方案请分享。
UDF:
def drop_dev_id(jsonResponse,dict_keys):
try:
data = json.loads(jsonResponse)
i = 0
n = len(data['iot_station'])
while (i < n):
if data['iot_station'][i]["dev_id"] not in dict_keys:
data['iot_station'].pop(i)
n -= 1
else:
i += 1
return data
except Exception as e:
print('Exception --> ' + str(e))
def drop_dev_id_udf(dict_keys):
return udf(lambda row: drop_dev_id(row,dict_keys), StringType())
df2 = df.select('value',drop_dev_id_udf(dict_keys)('value')).select('<lambda>(value)')
df2:pyspark.sql.dataframe.DataFrame
<lambda>(value):string
来源JSON
{
"iot_station": [
{
"dev_id": 100010100,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
},
{
"dev_id": 200020200,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
},
{
"dev_id": 300030300,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
}
]
}
输出JSON:
{
"iot_station": [
{
"dev_id": 200020200,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
},
{
"dev_id": 300030300,
"device1": dev_val1,
"device2": "dev_val2",
"device3": dev_val3,
"device4": "dev_val4",
"stationid": [
{
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
],
"geospat": {
"id": id_val,
"idrs": idrs_val,
"idrq": "idrq_val",
"idrx": "idrx_val"
}
}
]
}
您不需要 UDF 来实现您想要在此处实现的目标。只需正常加载它 JSON 而不是文本并使用 filter
函数过滤数组列 iot_station
:
from pyspark.sql import functions as F
df = spark.read.json("path/iot-sensor.json", multiLine=True)
device_id_list = [str(i) for i in [200020200, 300030300]]
df1 = df.withColumn(
"iot_station",
F.expr(f"""
filter(
iot_station,
x -> x.dev_id in ({','.join(device_id_list)})
)
""")
)
# check filtered json
df1.select(F.col("iot_station").getItem("dev_id").alias("dev_id")).show(truncate=False)
#+----------------------+
#|dev_id |
#+----------------------+
#|[200020200, 300030300]|
#+----------------------+