如何使用 python 提高此解析器的速度?

How do I improve the speed of this parser using python?

我目前正在解析来自瑞典 public 传输网络的历史延迟数据。从 1 月 27 日开始,我有大约 5700 个文件(每 15 秒一个),其中包含网络中活跃行程中车辆的瞬时延迟数据。不幸的是,它有很多开销/重复数据,所以我想解析出相关的东西来对其进行可视化。

但是,当我尝试使用下面的脚本解析和过滤掉行程级别的相关延迟数据时,它的执行速度非常慢。现在已经 运行 超过 1.5 小时(在我的 2019 Macbook Pro 15' 上)并且尚未完成。

非常感谢您。

from google.transit import gtfs_realtime_pb2
import gzip
import os
import datetime
import csv
import numpy as np

directory = '../data/tripu/27/'
datapoints = np.zeros((0,3), int)
read_trips = set()

# Loop through all files in directory
for filename in os.listdir(directory)[::3]:

    try:
        # Uncompress and parse protobuff-file using gtfs_realtime_pb2
        with gzip.open(directory + filename, 'rb') as file:
            response = file.read()
            feed = gtfs_realtime_pb2.FeedMessage()
            feed.ParseFromString(response)

            print("Filename: " + filename, "Total entities: " + str(len(feed.entity)))

            for trip in feed.entity:
                if trip.trip_update.trip.trip_id not in read_trips:

                    try:
                        if len(trip.trip_update.stop_time_update) == len(stopsOnTrip[trip.trip_update.trip.trip_id]):
                            print("\t","Adding delays for",len(trip.trip_update.stop_time_update),"stops, on trip_id",trip.trip_update.trip.trip_id)

                            for i, stop_time_update in enumerate(trip.trip_update.stop_time_update[:-1]):

                                # Store the delay data point (arrival difference of two ascending nodes)
                                delay = int(trip.trip_update.stop_time_update[i+1].arrival.delay-trip.trip_update.stop_time_update[i].arrival.delay)

                                # Store contextual metadata (timestamp and edgeID) for the unique delay data point
                                ts = int(trip.trip_update.stop_time_update[i+1].arrival.time)
                                key = int(str(trip.trip_update.stop_time_update[i].stop_id) + str(trip.trip_update.stop_time_update[i+1].stop_id))

                                # Append data to numpy array
                                datapoints = np.append(datapoints, np.array([[key,ts,delay]]), axis=0)

                            read_trips.add(trip.trip_update.trip.trip_id)
                    except KeyError:
                        continue
                else:
                    continue
    except OSError:
        continue

我怀疑这里的问题是重复调用 np.append 向 numpy 数组添加新行。因为numpy数组的大小在创建时是固定的,所以np.append()必须创建一个新的数组,也就是说要复制之前的数组。在每个循环中,数组都更大,因此所有这些副本都会增加执行时间的二次因子。当数组很大时(显然它在您的应用程序中),这变得很重要。

作为替代方案,您可以只创建一个普通的 Python 元组列表,然后如有必要,在最后将其转换为完整的 numpy 数组。

即(仅修改行):

datapoints = []
# ...
                            datapoints.append((key,ts,delay))
# ...
npdata = np.array(datapoints, dtype=int)

我仍然认为解析例程是您的瓶颈(即使它确实来自 Google),但是所有那些 '.' 都让我很烦! (而且它们确实会稍微降低性能。)此外,我将您的 i, i+1 迭代转换为使用两个迭代器压缩更新列表,这是一种更高级的列表处理方式。再加上 cur/next_update 名字帮助我在你想引用一个与另一个时保持直截了当。最后,我删除了尾随 "else: continue",因为无论如何你都在 for 循环的末尾。

for trip in feed.entity:
    this_trip_update = trip.trip_update 
    this_trip_id = this_trip_update.trip.trip_id
    if this_trip_id not in read_trips:

        try:
            if len(this_trip_update.stop_time_update) == len(stopsOnTrip[this_trip_id]):
                print("\t", "Adding delays for", len(this_trip_update.stop_time_update), "stops, on trip_id",
                      this_trip_id)

                # create two iterators to walk through the list of updates
                cur_updates = iter(this_trip_update.stop_time_update)
                nxt_updates = iter(this_trip_update.stop_time_update)
                # advance the nxt_updates iter so it is one ahead of cur_updates
                next(nxt_updates)

                for cur_update, next_update in zip(cur_updates, nxt_updates):
                    # Store the delay data point (arrival difference of two ascending nodes)
                    delay = int(nxt_update.arrival.delay - cur_update.arrival.delay)

                    # Store contextual metadata (timestamp and edgeID) for the unique delay data point
                    ts = int(next_update.arrival.time)
                    key = "{}/{}".format(cur_update.stop_id, next_update.stop_id)

                    # Append data to numpy array
                    datapoints = np.append(datapoints, np.array([[key, ts, delay]]), axis=0)

                read_trips.add(this_trip_id)
        except KeyError:
            continue

这段代码 应该 与您发布的代码相同,我也不希望有重大的性能提升,但也许当您回过头来看时这会更易于维护6 个月后。

(这可能 更适合 CodeReview,但我很少去那里。)