使用 Pyspark 从 ORC 中提取数据

Extract data out of ORC using Pyspark

我有一个 ORC 文件,我可以使用 Pyspark 2.2.0 将其读入 DataFrame

from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = spark.read.orc("s3://leadid-sandbox/krish/lead_test/")

上面的 df 具有如下架构

root
 |-- item: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

样本数据如下所示(只是一个样本数据,不是整个数据集)

item
{http_Accept-Language={"s":"en-US"}, Win64={"n":"1"}, 
geoip_region={"s":"FL"}, Platform={"s":"Win7"}, geoip_postal_code=
{"s":"33432"}, JavaApplets={"n":"1"}, http_Accept={"s":"*/*"}, 
Version={"s":"11.0"}, Cookies={"n":"1"}, Platform_Version=
{"s":"6.1"}, http_Content-Type={"s":"application/x-www-form-
urlencoded"}}
{http_Accept-Language={"s":"en-US"}, Win64={"n":"1"}, IFrames=
{"n":"1"}, geoip_region={"s":"CA"}, Platform={"s":"Win7"}, Parent=
{"s":"IE 11.0"}, http_Dnt={"n":"1"}} 

所以我爆了"item"如下

expDf = df.select(explode("item"))

上面的 DataFrame 有下面的模式,当我做一个节目时(2)有下面的细节

root
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)

+------------+----------+
| key| value|
+------------+----------+
|geoip_region|
{
    "s": "FL"
}
|
| Tables|
{
   "n": "1"
}
|
+------------+----------+

如何从这个 DataFrame 中 select 获取数据?我尝试了不同的方法但没有用。 所以我需要 'geoip_region' 的值为 'FL' 等等。 感谢任何帮助。

我不确定你的完整用例,但如果只是访问 "item" 中的键和值,你可以使用以下示例代码来实现:

row = df.select(df.item).collect()

上面一行会给你一个 Row 对象的列表,比如 [Row(item={http_Accept-Language={"s":"en-US"}, Win64={"n":"1"},....})]

然后select行内的所有值你可以做:row_item = row[0]['item']

row_item['http_Accept'] 将使您可以访问 u"{"s":"en-US"}"

eval(row_item['http_Accept']) 会给你一个字典,你可以从那里得到它的键,值

我刚才大概讲了流程,可以写成循环,迭代得到所有的key/values。

感谢 Joshi 的回复,出于某种原因,我在我的代码中收到行 [0] 未找到错误,我在 AWS Glue 环境中 运行 这可能是一个原因。

我用下面的代码得到了我想要的。

# Creating a DataFrame of the raw file

df = spark.read.orc("s3://leadid-sandbox/krish/lead_test/")

# Creating a temp view called Leads for the above dataFrame
df.createOrReplaceTempView("leads")

# Extracting the data using normal SQL from the above created Temp 
  View
tblSel = spark.sql("SELECT get_json_object(item['token'], '$.s') as 
token, get_json_object(item['account_code'], '$.s') as account_code 
from leads").show()