将速度从 zip 提高到 json

Improving speed from zip to json

我有一些财务数据组织为 zip 文件。自 2007 年以来每个交易日一个 zip 文件。每个文件包含 150'000 到 550'000 records/rows 和 20 columns/features 左右。我需要按照使用逻辑重新组织数据: /DataFolder/{Year}/{Column1}/{Column2}/{filename}.json

为清楚起见,Column1 是 Ticker 列,Column2 是 ExpiryDate 列。 此外,对于单个文件中的每个 ticker-expiryDate 对,我想填充一个新列(0 或 1),这是按货币行使价(给定 StockPrice 列中的值,我得到最接近行使价列的值股票价格)。

然后我将数据(对于每个 ticker-expiryDate 对)作为 json 文件写入我的硬盘。

最初我打算采用一些“蛮力”方法,即将 zip 文件作为数据帧加载,并对代码和到期日期进行一些排序。显然这不是最优的。处理一个 zip 文件大约需要 12 分钟。

然后我尝试使用 pandas groupby 函数生成的组来改进我的代码。它确实将每个 zip 文件的性能提高到略低于 10 分钟。但仍然考虑到我有近 3'000 个文件要处理我需要大量改进...

我使用一个函数来获得给定股票价格的最接近行使价:

def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return array[idx]

我对每个 zip 文件使用以下代码:

def process_directory(files2proc):
    #files2proc list of files to process

    for filename in files2proc:    
        df_zip = pd.read_csv(filename)
        df_grouped = df_zip.groupby(['ticker', 'expirDate'])

        for name, group in df_grouped:
            expYear = str(name[1])[-4:]
            exp_fmt = str(pd.to_datetime(name[1]).date()).replace('-', '')


            temp_path = '/Volumes/DataFolder/{}/{}/{}/{}/'.format(name[0][0], name[0], expYear, exp_fmt)
            dir_path = pathlib.Path(temp_path)
            dir_path.mkdir(parents=True, exist_ok=True)

            df_temp = df_grouped.get_group(name).reset_index(drop=True)

            strikes = df_temp['strike'].unique()
            stkprice = df_temp['stkPx'].iloc[0]
            atm_strike = find_nearest(strikes, stkprice)

            df_temp['atTheMoney'] = df_temp['strike'] == atm_strike

            file_path = temp_path + '_' + name[0] + '_' + exp_fmt + '.json'

            if not os.path.exists(file_path):
                df_temp.to_json(file_path, orient="records")
            else:
                df_orig = pd.read_json(file_path, orient="records")
                df_calls_up = df_orig.append(df_temp)
                df_calls_up.to_json(file_path, orient="records")

我担心 .reset_index 和 .iloc 语句很耗时。如何绕过对 find_nearest 的调用以提高速度?如果没有,是否有一种向量化的方式来填充平价列? 更一般地说,我认为我创建了太多对整体速度没有帮助的临时数据帧 (?)

编辑:zip 仅包含一个 csv 文件。我在下面复制了 csv 示例的一部分:

ticker stkPx expirDate  strike (other columns) trade_date
AAPL   34.3  1/20/2007  32.5                   1/3/2007 
AAPL   34.3  1/20/2007  35                     1/3/2007
AAPL   34.3  1/20/2007  37.5                   1/3/2007
AAPL   34.3  2/17/2007  30                     1/3/2007
AAPL   34.3  2/17/2007  35                     1/3/2007
AAPL   34.3  2/17/2007  40                     1/3/2007
(...)

我需要将此文件重新组织成几个 json 文件,如下所示: Datafolder/A/AAPL/2007/20070120/_AAPL_20070120.json Datafolder/A/AAPL/2007/20070120/_AAPL_20070217.json

使用这种格式: Datafolder/{TickerInitial}/{Ticker}/{year of expirDate}/{expirDate as yyyymmdd}/{ticker}{expirDate as yyyymmdd}.json

每个 json 文件都应该有一个名为 atTheMoney 的额外列(位于原始 zip 文件列的顶部)以标记 at the money 选项,即 _AAPL_20070120.json:

ticker stkPx expirDate  strike (other columns) trade_date atTheMoney
AAPL   34.3  1/20/2007  32.5                   1/3/2007   False
AAPL   34.3  1/20/2007  35                     1/3/2007   True
AAPL   34.3  1/20/2007  37.5                   1/3/2007   False

和 _AAPL_20070217.json:

ticker stkPx expirDate  strike (other columns) trade_date  atTheMoney
AAPL   34.3  2/17/2007  30                     1/3/2007    False
AAPL   34.3  2/17/2007  35                     1/3/2007    True
AAPL   34.3  2/17/2007  40                     1/3/2007    False

货币标志对应于给定 stkPx 列值的最接近行使价列值(即行使价 35 是 34.3 最接近 [32.5, 35, 37.5] 或 [30, 35, 40] 的值)

Edit2:这是我修改后的 JonSG 版本:

def process_file(zip_path, ifilename):

    stage_1 = collections.defaultdict(list)
    with zipfile.ZipFile(zip_path) as zf:
        with zf.open(ifilename + ".csv", "r") as file_in:
            reader = csv.reader(TextIOWrapper(file_in, 'utf-8'))
            colnames = {v: i for i, v in enumerate(next(reader))}
            for row in reader:
                key = f"{row[colnames['ticker']]}_{row[colnames['expirDate']]}"
                stage_1[key].append(row)

    for key, value in stage_1.items():
        min_strike_delta = float("inf")
        for row in value:
            row.append(abs(float(row[colnames['stkPx']]) - float(row[colnames['strike']])))
            if min_strike_delta > row[-1]:
                min_strike_delta = row[-1]

    value = [
        {
            **{cname: row[cvalue] for cname, cvalue in colnames.items()},
            **{"atTheMoney": row[-1] == min_strike_delta}
        }
        for row in value
    ]

    ticker, expirDate = key.split("_")
    exp_fmt = str(pd.to_datetime(expirDate).date()).replace('-', '')
    temp_path = f"/Volumes/DataCenter/{ticker[0]}/{ticker}/{exp_fmt[:4]}/{exp_fmt}/"
    filename = f"{temp_path}_{ticker}_{exp_fmt}.json"
    dir_path = pathlib.Path(temp_path)
    dir_path.mkdir(parents=True, exist_ok=True)

    with open(filename, "a+") as file_out:
        for row in value:
            file_out.write(json.dumps(row) + ",\n")

我想我可能会考虑以更手动的方式执行此操作。

从我们的模块开始:

import collections
import csv
import json

那么我们可以:

## ---------------------
## Gather the rows of data by key
## ---------------------
stage_1 = collections.defaultdict(list)
with open("strike.csv", "r") as file_in:
    reader = csv.reader(file_in)
    colnames = {v:i for i,v in enumerate(next(reader))}
    for row in reader:
        key = f"{row[colnames['ticker']]}_{row[colnames['expirDate']]}"
        stage_1[key].append(row)
## ---------------------

现在我们有一个字典,其中包含您的分组 ID 的键和给定键的各个行的值。

现在,对于上述字典中的每个键,我们将确定该键所有行的 atTheMoney 状态,然后写出一个 csv。

## ---------------------
## For each key find the minimum strike delta and update the values
## based on it. then write to output file
## ---------------------
for key, value in stage_1.items():
    ## --------------------
    ## find the min_strike_delta 
    ## --------------------
    min_strike_delta = float("inf")
    for row in value:
        row.append(abs(float(row[colnames['stkPx']]) - float(row[colnames['strike']])))
        if min_strike_delta > row[-1]:
            min_strike_delta = row[-1]
    ## --------------------

    ## --------------------
    ## Cast value to a list of dictionaries rather than a list of lists.
    ## This is a little heavier so let's just do it part by part rather than as part of stage 1
    ## --------------------
    value = [
        {
            **{cname: row[cvalue] for cname, cvalue in colnames.items()},
            **{"atTheMoney": row[-1] == min_strike_delta}
        }
        for row in value
    ]
    ## --------------------

    ## --------------------
    ## Create a result file name.
    ## This is just a simple one for testing and yours is more complicated
    ## --------------------
    ticker, expirDate = key.split("_")
    filename = f"{ticker[0]}_{ticker}_{expirDate.replace('/', '')}.json"
    ## --------------------

    ## --------------------
    ## Overwrite the list to output file.
    ## --------------------
    #with open(filename, "w", newline="") as file_out:
    #    json.dump(value, file_out)
    ## --------------------

    ## --------------------
    ## To append rows to a prior file
    ## You probably don't want an actual json array
    ## you probably want rows of json records...
    ## --------------------
    with open(filename, "a+") as file_out:
        for row in value:
            file_out.write(json.dumps(row) + ",\n")
    ## --------------------
## ---------------------

使用您的测试数据,这会生成几个文件:

A_AAPL_1202007.json:

[
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "32.5", "trade_date": "1/3/2007", "atTheMoney": false},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "37.5", "trade_date": "1/3/2007", "atTheMoney": false}
]

A_AAPL_2172007.json

[
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "30", "trade_date": "1/3/2007", "atTheMoney": false},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "40", "trade_date": "1/3/2007", "atTheMoney": false}
]

希望这比你现在拥有的更快。

因为我们知道数据是按键聚类的,所以我们可以利用这一事实来减少我们需要做的工作量。我们现在只需要读取一次数据。

import csv
import json

## ---------------------
## Funtion to handle writing data.
## this way we don't repeate ourselves
## ---------------------
def append_data(key, rows, min_strike_delta, colnames):
    if not rows:
        return

    ticker, expirDate = key.split("_")
    filename = f"{ticker[0]}_{ticker}_{expirDate.replace('/', '')}.json"
    with open(filename, "a+") as file_out:
        for row in rows:
            row[-1] = row[-1] == min_strike_delta
            row_json = {cname: row[cvalue] for cname, cvalue in colnames.items()}
            file_out.write(json.dumps(row_json) + "\n")
## ---------------------

## ---------------------
## Let's use the fact that rows are already grouped by key to our advantage:
## ---------------------
with open("strike.csv", "r") as file_in:
    reader = csv.reader(file_in)
    colnames = {v:i for i,v in enumerate(next(reader) + ["atTheMoney"])}

    current_key = ""
    current_rows = []
    current_min_strike_delta = float("inf")

    for row in reader:
        key = f"{row[colnames['ticker']]}_{row[colnames['expirDate']]}"

        ## ---------------------
        ## If the keys are different try to write out the prior batch of rows
        ## ---------------------
        if key != current_key:
            append_data(current_key, current_rows, current_min_strike_delta, colnames)
            current_key = key
            current_rows = []
            current_min_strike_delta = float("inf")
        ## ---------------------

        strike_delta = abs(float(row[colnames['stkPx']]) - float(row[colnames['strike']]))
        current_min_strike_delta = min(current_min_strike_delta, strike_delta)
        row.append(strike_delta)
        current_rows.append(row)

## ---------------------
## If there are any remaining rows write them out
## ---------------------
append_data(current_key, current_rows, current_min_strike_delta, colnames)
## ---------------------

当运行时,它应该给我们两个文件:

A_AAPL_1202007.json:

{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "32.5", "trade_date": "1/3/2007", "atTheMoney": false}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "37.5", "trade_date": "1/3/2007", "atTheMoney": false}

A_AAPL_2172007.json:

{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "30", "trade_date": "1/3/2007", "atTheMoney": false}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "40", "trade_date": "1/3/2007", "atTheMoney": false}

此时,速度变慢可能是由于反复打开文件并寻找文件末尾进行追加。