如何使用 python 提高此解析器的速度?
How do I improve the speed of this parser using python?
我目前正在解析来自瑞典 public 传输网络的历史延迟数据。从 1 月 27 日开始,我有大约 5700 个文件(每 15 秒一个),其中包含网络中活跃行程中车辆的瞬时延迟数据。不幸的是,它有很多开销/重复数据,所以我想解析出相关的东西来对其进行可视化。
但是,当我尝试使用下面的脚本解析和过滤掉行程级别的相关延迟数据时,它的执行速度非常慢。现在已经 运行 超过 1.5 小时(在我的 2019 Macbook Pro 15' 上)并且尚未完成。
- 如何优化/改进此 python 解析器?
- 或者我是否应该为此任务减少文件数量,即数据收集的频率?
非常感谢您。
from google.transit import gtfs_realtime_pb2
import gzip
import os
import datetime
import csv
import numpy as np
directory = '../data/tripu/27/'
datapoints = np.zeros((0,3), int)
read_trips = set()
# Loop through all files in directory
for filename in os.listdir(directory)[::3]:
try:
# Uncompress and parse protobuff-file using gtfs_realtime_pb2
with gzip.open(directory + filename, 'rb') as file:
response = file.read()
feed = gtfs_realtime_pb2.FeedMessage()
feed.ParseFromString(response)
print("Filename: " + filename, "Total entities: " + str(len(feed.entity)))
for trip in feed.entity:
if trip.trip_update.trip.trip_id not in read_trips:
try:
if len(trip.trip_update.stop_time_update) == len(stopsOnTrip[trip.trip_update.trip.trip_id]):
print("\t","Adding delays for",len(trip.trip_update.stop_time_update),"stops, on trip_id",trip.trip_update.trip.trip_id)
for i, stop_time_update in enumerate(trip.trip_update.stop_time_update[:-1]):
# Store the delay data point (arrival difference of two ascending nodes)
delay = int(trip.trip_update.stop_time_update[i+1].arrival.delay-trip.trip_update.stop_time_update[i].arrival.delay)
# Store contextual metadata (timestamp and edgeID) for the unique delay data point
ts = int(trip.trip_update.stop_time_update[i+1].arrival.time)
key = int(str(trip.trip_update.stop_time_update[i].stop_id) + str(trip.trip_update.stop_time_update[i+1].stop_id))
# Append data to numpy array
datapoints = np.append(datapoints, np.array([[key,ts,delay]]), axis=0)
read_trips.add(trip.trip_update.trip.trip_id)
except KeyError:
continue
else:
continue
except OSError:
continue
我怀疑这里的问题是重复调用 np.append
向 numpy 数组添加新行。因为numpy数组的大小在创建时是固定的,所以np.append()
必须创建一个新的数组,也就是说要复制之前的数组。在每个循环中,数组都更大,因此所有这些副本都会增加执行时间的二次因子。当数组很大时(显然它在您的应用程序中),这变得很重要。
作为替代方案,您可以只创建一个普通的 Python 元组列表,然后如有必要,在最后将其转换为完整的 numpy 数组。
即(仅修改行):
datapoints = []
# ...
datapoints.append((key,ts,delay))
# ...
npdata = np.array(datapoints, dtype=int)
我仍然认为解析例程是您的瓶颈(即使它确实来自 Google),但是所有那些 '.' 都让我很烦! (而且它们确实会稍微降低性能。)此外,我将您的 i, i+1 迭代转换为使用两个迭代器压缩更新列表,这是一种更高级的列表处理方式。再加上 cur/next_update
名字帮助我在你想引用一个与另一个时保持直截了当。最后,我删除了尾随 "else: continue",因为无论如何你都在 for 循环的末尾。
for trip in feed.entity:
this_trip_update = trip.trip_update
this_trip_id = this_trip_update.trip.trip_id
if this_trip_id not in read_trips:
try:
if len(this_trip_update.stop_time_update) == len(stopsOnTrip[this_trip_id]):
print("\t", "Adding delays for", len(this_trip_update.stop_time_update), "stops, on trip_id",
this_trip_id)
# create two iterators to walk through the list of updates
cur_updates = iter(this_trip_update.stop_time_update)
nxt_updates = iter(this_trip_update.stop_time_update)
# advance the nxt_updates iter so it is one ahead of cur_updates
next(nxt_updates)
for cur_update, next_update in zip(cur_updates, nxt_updates):
# Store the delay data point (arrival difference of two ascending nodes)
delay = int(nxt_update.arrival.delay - cur_update.arrival.delay)
# Store contextual metadata (timestamp and edgeID) for the unique delay data point
ts = int(next_update.arrival.time)
key = "{}/{}".format(cur_update.stop_id, next_update.stop_id)
# Append data to numpy array
datapoints = np.append(datapoints, np.array([[key, ts, delay]]), axis=0)
read_trips.add(this_trip_id)
except KeyError:
continue
这段代码 应该 与您发布的代码相同,我也不希望有重大的性能提升,但也许当您回过头来看时这会更易于维护6 个月后。
(这可能 更适合 CodeReview,但我很少去那里。)
我目前正在解析来自瑞典 public 传输网络的历史延迟数据。从 1 月 27 日开始,我有大约 5700 个文件(每 15 秒一个),其中包含网络中活跃行程中车辆的瞬时延迟数据。不幸的是,它有很多开销/重复数据,所以我想解析出相关的东西来对其进行可视化。
但是,当我尝试使用下面的脚本解析和过滤掉行程级别的相关延迟数据时,它的执行速度非常慢。现在已经 运行 超过 1.5 小时(在我的 2019 Macbook Pro 15' 上)并且尚未完成。
- 如何优化/改进此 python 解析器?
- 或者我是否应该为此任务减少文件数量,即数据收集的频率?
非常感谢您。
from google.transit import gtfs_realtime_pb2
import gzip
import os
import datetime
import csv
import numpy as np
directory = '../data/tripu/27/'
datapoints = np.zeros((0,3), int)
read_trips = set()
# Loop through all files in directory
for filename in os.listdir(directory)[::3]:
try:
# Uncompress and parse protobuff-file using gtfs_realtime_pb2
with gzip.open(directory + filename, 'rb') as file:
response = file.read()
feed = gtfs_realtime_pb2.FeedMessage()
feed.ParseFromString(response)
print("Filename: " + filename, "Total entities: " + str(len(feed.entity)))
for trip in feed.entity:
if trip.trip_update.trip.trip_id not in read_trips:
try:
if len(trip.trip_update.stop_time_update) == len(stopsOnTrip[trip.trip_update.trip.trip_id]):
print("\t","Adding delays for",len(trip.trip_update.stop_time_update),"stops, on trip_id",trip.trip_update.trip.trip_id)
for i, stop_time_update in enumerate(trip.trip_update.stop_time_update[:-1]):
# Store the delay data point (arrival difference of two ascending nodes)
delay = int(trip.trip_update.stop_time_update[i+1].arrival.delay-trip.trip_update.stop_time_update[i].arrival.delay)
# Store contextual metadata (timestamp and edgeID) for the unique delay data point
ts = int(trip.trip_update.stop_time_update[i+1].arrival.time)
key = int(str(trip.trip_update.stop_time_update[i].stop_id) + str(trip.trip_update.stop_time_update[i+1].stop_id))
# Append data to numpy array
datapoints = np.append(datapoints, np.array([[key,ts,delay]]), axis=0)
read_trips.add(trip.trip_update.trip.trip_id)
except KeyError:
continue
else:
continue
except OSError:
continue
我怀疑这里的问题是重复调用 np.append
向 numpy 数组添加新行。因为numpy数组的大小在创建时是固定的,所以np.append()
必须创建一个新的数组,也就是说要复制之前的数组。在每个循环中,数组都更大,因此所有这些副本都会增加执行时间的二次因子。当数组很大时(显然它在您的应用程序中),这变得很重要。
作为替代方案,您可以只创建一个普通的 Python 元组列表,然后如有必要,在最后将其转换为完整的 numpy 数组。
即(仅修改行):
datapoints = []
# ...
datapoints.append((key,ts,delay))
# ...
npdata = np.array(datapoints, dtype=int)
我仍然认为解析例程是您的瓶颈(即使它确实来自 Google),但是所有那些 '.' 都让我很烦! (而且它们确实会稍微降低性能。)此外,我将您的 i, i+1 迭代转换为使用两个迭代器压缩更新列表,这是一种更高级的列表处理方式。再加上 cur/next_update
名字帮助我在你想引用一个与另一个时保持直截了当。最后,我删除了尾随 "else: continue",因为无论如何你都在 for 循环的末尾。
for trip in feed.entity:
this_trip_update = trip.trip_update
this_trip_id = this_trip_update.trip.trip_id
if this_trip_id not in read_trips:
try:
if len(this_trip_update.stop_time_update) == len(stopsOnTrip[this_trip_id]):
print("\t", "Adding delays for", len(this_trip_update.stop_time_update), "stops, on trip_id",
this_trip_id)
# create two iterators to walk through the list of updates
cur_updates = iter(this_trip_update.stop_time_update)
nxt_updates = iter(this_trip_update.stop_time_update)
# advance the nxt_updates iter so it is one ahead of cur_updates
next(nxt_updates)
for cur_update, next_update in zip(cur_updates, nxt_updates):
# Store the delay data point (arrival difference of two ascending nodes)
delay = int(nxt_update.arrival.delay - cur_update.arrival.delay)
# Store contextual metadata (timestamp and edgeID) for the unique delay data point
ts = int(next_update.arrival.time)
key = "{}/{}".format(cur_update.stop_id, next_update.stop_id)
# Append data to numpy array
datapoints = np.append(datapoints, np.array([[key, ts, delay]]), axis=0)
read_trips.add(this_trip_id)
except KeyError:
continue
这段代码 应该 与您发布的代码相同,我也不希望有重大的性能提升,但也许当您回过头来看时这会更易于维护6 个月后。
(这可能 更适合 CodeReview,但我很少去那里。)