使用 json 数据应用过滤条件
Apply the filter condition using the json data
我正在使用 UI 生成 JSON 数据,并且 UI 用于获取用户的输入并生成包含输入的 JSON 文件.然后我们使用JSON文件数据,根据两列(country
和currency
)筛选出记录。
json__data = {
"Year": 2019,
"EXCHANGE":
[
{
"total": 142250.0,
"COUNTRY":"JPN",
"CUR": "YEN"
},
{
"total": 210.88999999999996,
"COUNTRY": "US"
"CUR": "DOL"
},
{
"total": 1065600.0,
"COUNTRY": "UK"
"CUR": "POU"
}
]
}
以上是 json 文件示例。我有另一个文件 demo.csv
,它有多个列,例如(Name
、Currency
、Contact_details
、Currency
等)。我只需要包含在 json 文件中的记录。例如-我需要来自国家(US
、UK
、JPN
)和货币(DOL
、POU
、YEN
)的记录。我需要逻辑来实现这个条件。
import json
from pyspark.sql import SparkSession
class Test(object):
def __init__(self, json__data,a):
self.a = []
self.dict = json.loads(json__data)
for data in dict:
a.append(data['COUNTRY']) #a= [US, UK, JPN]
b.append(data['CUR']) #b= [DOL, POU, YEN]
def spark(self,a):
ss = SparkSession.builder.master("local[1]").appName("This is sample csv").getOrCreate()
df = ss.read.format("csv").load("C:\Users\demo.csv")
dt = spark.sql(SELECT * FROM df where col("COUNTRY") IN a and col("CURRENCY") IN b) # I need all records satisfying given filter conditions
ds = dt.show()
您可以像这样简单地使用 filter
:
import pyspark.sql.functions as F
df1 = df.filter(F.col("COUNTRY").isin(a) & F.col("CURRENCY").isin(b))
或者您可以使用 spark 读取 json 文件,然后与来自 csv 的其他数据帧连接:
df_json = spark.read.json("path_to_json")
df_csv = spark.read.csv("path_to_csv")
result = df_csv.join(
df_json.select("COUNTRY", "CURRENCY"),
on=["COUNTRY", "CURRENCY"],
how="inner"
)
我正在使用 UI 生成 JSON 数据,并且 UI 用于获取用户的输入并生成包含输入的 JSON 文件.然后我们使用JSON文件数据,根据两列(country
和currency
)筛选出记录。
json__data = {
"Year": 2019,
"EXCHANGE":
[
{
"total": 142250.0,
"COUNTRY":"JPN",
"CUR": "YEN"
},
{
"total": 210.88999999999996,
"COUNTRY": "US"
"CUR": "DOL"
},
{
"total": 1065600.0,
"COUNTRY": "UK"
"CUR": "POU"
}
]
}
以上是 json 文件示例。我有另一个文件 demo.csv
,它有多个列,例如(Name
、Currency
、Contact_details
、Currency
等)。我只需要包含在 json 文件中的记录。例如-我需要来自国家(US
、UK
、JPN
)和货币(DOL
、POU
、YEN
)的记录。我需要逻辑来实现这个条件。
import json
from pyspark.sql import SparkSession
class Test(object):
def __init__(self, json__data,a):
self.a = []
self.dict = json.loads(json__data)
for data in dict:
a.append(data['COUNTRY']) #a= [US, UK, JPN]
b.append(data['CUR']) #b= [DOL, POU, YEN]
def spark(self,a):
ss = SparkSession.builder.master("local[1]").appName("This is sample csv").getOrCreate()
df = ss.read.format("csv").load("C:\Users\demo.csv")
dt = spark.sql(SELECT * FROM df where col("COUNTRY") IN a and col("CURRENCY") IN b) # I need all records satisfying given filter conditions
ds = dt.show()
您可以像这样简单地使用 filter
:
import pyspark.sql.functions as F
df1 = df.filter(F.col("COUNTRY").isin(a) & F.col("CURRENCY").isin(b))
或者您可以使用 spark 读取 json 文件,然后与来自 csv 的其他数据帧连接:
df_json = spark.read.json("path_to_json")
df_csv = spark.read.csv("path_to_csv")
result = df_csv.join(
df_json.select("COUNTRY", "CURRENCY"),
on=["COUNTRY", "CURRENCY"],
how="inner"
)