从 rest api 嵌套 json 到 pyspark 数据框
nested json from rest api to pyspark dataframe
我正在尝试创建一个数据管道,我从 REST API 请求数据。输出是一个很好的嵌套 json 文件。我想将 json 文件读入 pyspark 数据帧。当我在本地保存文件并使用以下代码时,这工作正常:
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession\
.builder\
.appName("jsontest")\
.getOrCreate()
raw_df = spark.read.json(r"my_json_path", multiLine='true')
但是当我想在发出 API 请求后直接制作一个 pyspark 数据帧时,我收到以下错误:
我使用以下代码进行休息 api 调用和转换为 pyspark 数据帧:
apiCallHeaders = {'Authorization': 'Bearer ' + bearer_token}
apiCallResponse = requests.get(data_url, headers=apiCallHeaders, verify=True)
json_rdd = spark.sparkContext.parallelize(apiCallResponse.text)
raw_df = spark.read.json(json_rdd)
以下是部分响应输出
{"networks":[{"href":"/v2/networks/velobike-moscow","id":"velobike-moscow","name":"Velobike"},{"href":"/v2/networks/bycyklen","id":"bycyklen","name":"Bycyklen"},{"href":"/v2/networks/nu-connect","id":"nu-connect","name":"Nu-Connect"},{"href":"/v2/networks/baerum-bysykkel","id":"baerum-bysykkel","name":"Bysykkel"},{"href":"/v2/networks/bysykkelen","id":"bysykkelen","name":"Bysykkelen"},{"href":"/v2/networks/onroll-a-rua","id":"onroll-a-rua","name":"Onroll"},{"href":"/v2/networks/onroll-albacete","id":"onroll-albacete","name":"Onroll"},{"href":"/v2/networks/onroll-alhama-de-murcia","id":"onroll-alhama-de-murcia","name":"Onroll"},{"href":"/v2/networks/onroll-almunecar","id":"onroll-almunecar","name":"Onroll"},{"href":"/v2/networks/onroll-antequera","id":"onroll-antequera","name":"Onroll"},{"href":"/v2/networks/onroll-aranda-de-duero","id":"onroll-aranda-de-duero","name":"Onroll"}
我希望我的问题有道理并且有人可以提供帮助。
提前致谢!
在此 之后,您可以添加以下行:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
并且要 运行 您的代码必须在此处添加 [ ]
:
rdd = spark.sparkContext.parallelize([apiCallResponse.text])
看例子:
import requests
response = requests.get('http://api.citybik.es/v2/networks?fields=id,name,href')
rdd = spark.sparkContext.parallelize([response.text])
df = spark.read.json(rdd)
df.printSchema()
# root
# |-- networks: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- href: string (nullable = true)
# | | |-- id: string (nullable = true)
# | | |-- name: string (nullable = true)
(df
.selectExpr('inline(networks)')
.show(n=5, truncate=False))
# +----------------------------+---------------+----------+
# |href |id |name |
# +----------------------------+---------------+----------+
# |/v2/networks/velobike-moscow|velobike-moscow|Velobike |
# |/v2/networks/bycyklen |bycyklen |Bycyklen |
# |/v2/networks/nu-connect |nu-connect |Nu-Connect|
# |/v2/networks/baerum-bysykkel|baerum-bysykkel|Bysykkel |
# |/v2/networks/bysykkelen |bysykkelen |Bysykkelen|
# +----------------------------+---------------+----------+
我正在尝试创建一个数据管道,我从 REST API 请求数据。输出是一个很好的嵌套 json 文件。我想将 json 文件读入 pyspark 数据帧。当我在本地保存文件并使用以下代码时,这工作正常:
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession\
.builder\
.appName("jsontest")\
.getOrCreate()
raw_df = spark.read.json(r"my_json_path", multiLine='true')
但是当我想在发出 API 请求后直接制作一个 pyspark 数据帧时,我收到以下错误:
我使用以下代码进行休息 api 调用和转换为 pyspark 数据帧:
apiCallHeaders = {'Authorization': 'Bearer ' + bearer_token}
apiCallResponse = requests.get(data_url, headers=apiCallHeaders, verify=True)
json_rdd = spark.sparkContext.parallelize(apiCallResponse.text)
raw_df = spark.read.json(json_rdd)
以下是部分响应输出
{"networks":[{"href":"/v2/networks/velobike-moscow","id":"velobike-moscow","name":"Velobike"},{"href":"/v2/networks/bycyklen","id":"bycyklen","name":"Bycyklen"},{"href":"/v2/networks/nu-connect","id":"nu-connect","name":"Nu-Connect"},{"href":"/v2/networks/baerum-bysykkel","id":"baerum-bysykkel","name":"Bysykkel"},{"href":"/v2/networks/bysykkelen","id":"bysykkelen","name":"Bysykkelen"},{"href":"/v2/networks/onroll-a-rua","id":"onroll-a-rua","name":"Onroll"},{"href":"/v2/networks/onroll-albacete","id":"onroll-albacete","name":"Onroll"},{"href":"/v2/networks/onroll-alhama-de-murcia","id":"onroll-alhama-de-murcia","name":"Onroll"},{"href":"/v2/networks/onroll-almunecar","id":"onroll-almunecar","name":"Onroll"},{"href":"/v2/networks/onroll-antequera","id":"onroll-antequera","name":"Onroll"},{"href":"/v2/networks/onroll-aranda-de-duero","id":"onroll-aranda-de-duero","name":"Onroll"}
我希望我的问题有道理并且有人可以提供帮助。
提前致谢!
在此
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
并且要 运行 您的代码必须在此处添加 [ ]
:
rdd = spark.sparkContext.parallelize([apiCallResponse.text])
看例子:
import requests
response = requests.get('http://api.citybik.es/v2/networks?fields=id,name,href')
rdd = spark.sparkContext.parallelize([response.text])
df = spark.read.json(rdd)
df.printSchema()
# root
# |-- networks: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- href: string (nullable = true)
# | | |-- id: string (nullable = true)
# | | |-- name: string (nullable = true)
(df
.selectExpr('inline(networks)')
.show(n=5, truncate=False))
# +----------------------------+---------------+----------+
# |href |id |name |
# +----------------------------+---------------+----------+
# |/v2/networks/velobike-moscow|velobike-moscow|Velobike |
# |/v2/networks/bycyklen |bycyklen |Bycyklen |
# |/v2/networks/nu-connect |nu-connect |Nu-Connect|
# |/v2/networks/baerum-bysykkel|baerum-bysykkel|Bysykkel |
# |/v2/networks/bysykkelen |bysykkelen |Bysykkelen|
# +----------------------------+---------------+----------+