使用 Dask 加载许多具有嵌套数据结构的 JSON 文件以形成纬度和经度图
Loading many JSON files with nested data structures to form latitude and longitude plot using Dask
我偶然发现了 this video showing how you can load many JSON files and extract the data into a Dask dataframe. It looked ideal for something that I've been wanting to do. I have lot of JSON files in sub-directories (one for each day). Each JSON file holds trip data for a vehicle. Actually subtrips which consist of multiple start and end positions. A file can have multiple SubTrips
in a JSON array. There can be multiple StartPosition
in a SubTrips
. StartPosition
is optional within SubTrips
. The StartPosition
contains a Latitude
and Longitude
. The data (all on line in the file) looks something like this (also see this 示例文件):
[{"SubTrips":[{"StartPosition":{"IsAvl":false,"Source":"Gps","OdometerKilometres":147918,"DistanceSinceReadingKilometres":0,"AgeOfReadingSeconds":9528,"Pdop":0,"Vdop":0,"Hdop":1,"NumberOfSatellites":10,"Heading":0,"AltitudeMetres":38,"SpeedKilometresPerHour":0,"Longitude":18.4726696014404,"Latitude":-33.8071098327637,"Timestamp":"2019-01-01T06:31:51Z","DriverId":-9119609258885412605,"AssetId":-5651694037363791566,"PositionId":816711615055360000},"EndPosition":{
我能够使用以下方法读取 JSON 文件:
import dask.bag as db
lines = db.read_text("C:/Trips/2019*/*.json")
records = lines.map(json.loads)
但是,我无法将嵌套的 JSON 数据结构提取到 Dask 数据框中,每一行都是 JSON 文件中的 StartPosition
记录。 youtube 视频只显示一级 JSON,而我有多个级别和多个数据数组。多个级别似乎可行,但我在阵列上失败了。还有一个问题是 StartPosition
是可选的,下面用 filter
解决了这个问题。我可以做这样的事情;但这只需要数组中文件的第一个子行程,以及由多个起始位置组成的子行程的第一个起始位置:
records.filter(lambda d: "StartPosition" in d[0]["SubTrips"][0]).map(lambda d: d[0]["SubTrips"][0]["StartPosition"]["NumberOfSatellites"]).frequencies(sort=True).compute()
可能是第一个数组可以用这个排序,但我不确定:
subtrips = records.flatten()
如何将所有 StartPosition
记录数据提取到 Dask 数据帧中?基本上一行的属性为StartPosition
,主要包括经纬度。
最后,我想使用 datashader
将 Dask 位置绘制到 mapbox
地图上。
看起来你已经很接近结果了。除了视频之外,另一个有用的资源是 this tutorial.
要处理自定义逻辑(如 StartPosition
的可选性),最好定义自定义函数:
from json import loads
from dask.bag import read_text
def process(record):
# implement custom logic here
for entry in record.get("SubTrips"):
if data_of_interest := entry.get("StartPosition"):
yield {
k: v
for k, v in data_of_interest.items()
if k in ["DriverId", "Longitude", "Latitude"]
}
def process_json(json_data):
# this will produce a list of records, so make sure to flatten
return [result for record in json_data for result in process(record)]
bag = (
read_text("7044247166111462221*.json")
.map(loads)
.map(process_json)
.flatten()
.to_dataframe()
)
将json.loads
应用于text/string数据给出了一个dict,关键函数是process
,它定义了转换特定记录的逻辑,process_json
是一个包装器处理一般情况(假设每个文件有多个记录,但如果每个文件只有一个记录,则不需要此包装器)。在将数据转换为 dataframe 之前,我们需要 flatten
它,以便包中的每个元素都是一个字典。
我偶然发现了 this video showing how you can load many JSON files and extract the data into a Dask dataframe. It looked ideal for something that I've been wanting to do. I have lot of JSON files in sub-directories (one for each day). Each JSON file holds trip data for a vehicle. Actually subtrips which consist of multiple start and end positions. A file can have multiple SubTrips
in a JSON array. There can be multiple StartPosition
in a SubTrips
. StartPosition
is optional within SubTrips
. The StartPosition
contains a Latitude
and Longitude
. The data (all on line in the file) looks something like this (also see this 示例文件):
[{"SubTrips":[{"StartPosition":{"IsAvl":false,"Source":"Gps","OdometerKilometres":147918,"DistanceSinceReadingKilometres":0,"AgeOfReadingSeconds":9528,"Pdop":0,"Vdop":0,"Hdop":1,"NumberOfSatellites":10,"Heading":0,"AltitudeMetres":38,"SpeedKilometresPerHour":0,"Longitude":18.4726696014404,"Latitude":-33.8071098327637,"Timestamp":"2019-01-01T06:31:51Z","DriverId":-9119609258885412605,"AssetId":-5651694037363791566,"PositionId":816711615055360000},"EndPosition":{
我能够使用以下方法读取 JSON 文件:
import dask.bag as db
lines = db.read_text("C:/Trips/2019*/*.json")
records = lines.map(json.loads)
但是,我无法将嵌套的 JSON 数据结构提取到 Dask 数据框中,每一行都是 JSON 文件中的 StartPosition
记录。 youtube 视频只显示一级 JSON,而我有多个级别和多个数据数组。多个级别似乎可行,但我在阵列上失败了。还有一个问题是 StartPosition
是可选的,下面用 filter
解决了这个问题。我可以做这样的事情;但这只需要数组中文件的第一个子行程,以及由多个起始位置组成的子行程的第一个起始位置:
records.filter(lambda d: "StartPosition" in d[0]["SubTrips"][0]).map(lambda d: d[0]["SubTrips"][0]["StartPosition"]["NumberOfSatellites"]).frequencies(sort=True).compute()
可能是第一个数组可以用这个排序,但我不确定:
subtrips = records.flatten()
如何将所有 StartPosition
记录数据提取到 Dask 数据帧中?基本上一行的属性为StartPosition
,主要包括经纬度。
最后,我想使用 datashader
将 Dask 位置绘制到 mapbox
地图上。
看起来你已经很接近结果了。除了视频之外,另一个有用的资源是 this tutorial.
要处理自定义逻辑(如 StartPosition
的可选性),最好定义自定义函数:
from json import loads
from dask.bag import read_text
def process(record):
# implement custom logic here
for entry in record.get("SubTrips"):
if data_of_interest := entry.get("StartPosition"):
yield {
k: v
for k, v in data_of_interest.items()
if k in ["DriverId", "Longitude", "Latitude"]
}
def process_json(json_data):
# this will produce a list of records, so make sure to flatten
return [result for record in json_data for result in process(record)]
bag = (
read_text("7044247166111462221*.json")
.map(loads)
.map(process_json)
.flatten()
.to_dataframe()
)
将json.loads
应用于text/string数据给出了一个dict,关键函数是process
,它定义了转换特定记录的逻辑,process_json
是一个包装器处理一般情况(假设每个文件有多个记录,但如果每个文件只有一个记录,则不需要此包装器)。在将数据转换为 dataframe 之前,我们需要 flatten
它,以便包中的每个元素都是一个字典。