使用 json 数据应用过滤条件

Apply the filter condition using the json data

我正在使用 UI 生成 JSON 数据,并且 UI 用于获取用户的输入并生成包含输入的 JSON 文件.然后我们使用JSON文件数据,根据两列(countrycurrency)筛选出记录。

json__data = {
"Year": 2019,
"EXCHANGE":
    [
        {
            "total": 142250.0,
            "COUNTRY":"JPN",
            "CUR": "YEN"
        },
        {
            "total": 210.88999999999996,
            "COUNTRY": "US"
            "CUR": "DOL"
        },
        {
            "total": 1065600.0,
            "COUNTRY": "UK" 
            "CUR": "POU"
        }
]

}

以上是 json 文件示例。我有另一个文件 demo.csv,它有多个列,例如(NameCurrencyContact_detailsCurrency 等)。我只需要包含在 json 文件中的记录。例如-我需要来自国家(USUKJPN)和货币(DOLPOUYEN)的记录。我需要逻辑来实现这个条件。

import json
from pyspark.sql import SparkSession

class Test(object):
    def __init__(self, json__data,a):
         self.a = []
         self.dict = json.loads(json__data)
          for data in dict:
               a.append(data['COUNTRY'])          #a= [US, UK, JPN] 
               b.append(data['CUR'])              #b= [DOL, POU, YEN]
        
def spark(self,a):
     ss = SparkSession.builder.master("local[1]").appName("This is sample csv").getOrCreate()
     df = ss.read.format("csv").load("C:\Users\demo.csv")
     dt = spark.sql(SELECT * FROM df where col("COUNTRY") IN a  and col("CURRENCY") IN b)  # I need all records satisfying given filter conditions
     ds = dt.show()

您可以像这样简单地使用 filter

import pyspark.sql.functions as F

df1 = df.filter(F.col("COUNTRY").isin(a) & F.col("CURRENCY").isin(b))

或者您可以使用 spark 读取 json 文件,然后与来自 csv 的其他数据帧连接:

df_json = spark.read.json("path_to_json")
df_csv = spark.read.csv("path_to_csv")

result = df_csv.join(
    df_json.select("COUNTRY", "CURRENCY"),
    on=["COUNTRY", "CURRENCY"],
    how="inner"
)