如何将 30GB 的大 bz2 文件拆分为多个小的 bz2 文件并为每个文件添加 header

How to split big 30GB bz2 file into multiple small bz2 files and add a header to each

我有大量 bz2 格式的文件(每个 30GB),但没有任何 header。我可以使用以下 pileline.

轻松地将它们分成 500M 每个尺寸
bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-

但是我无法添加 header ['a' 'b' 'c' 'd' 'e' 'f' 'timestamp'] 我想包含每个拆分的 bz2 文件。

更重要的是我想拆分文件不是基于 500M,而是我想每天拆分 bz2 文件(例如:splitted_file_2021-01-01.csv.bz2splitted_file_2021-01-02.csv.bz2)根据数据中timestamp的内容

数据为tab-delimited文本,如下(没有header,需要添加):

19252547212 1   3041    2   1   74.18   1.8504  2021-05-01 00:00:00
19252547213 1   5055    2   1   0       0       2021-05-01 00:00:00
19252547214 1   5073    1   1   53.81   0.1836  2021-05-01 00:00:00

您可以使用 bz2 包打开 BZ2 编码文件并将它们视为常规文件对象。以二进制形式读/写有一个小的性能优势。假设您的数据是 ASCII 或 UTF-8,并且数据中不需要转义制表符,您可以逐行读取文件,在新时间戳出现时打开并写入输出。

import bz2
import os

outfile = None
date = b""

with bz2.open("file") as fileobj:
    for line in filobj:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            new_file = f"splitted_file_{new_date}.csv.bz2"
            exists = os.path.exists(new_file)
            outfile = bz2.open(new_file, "ab")
            if not exists:
                outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
        # write the row
        outfile.writeline(line)
if outfile:
    outfile.close()

您可以使用管道加快此过程。将解密和加密都交给单独的 bzip2 进程,这些进程将 运行 在不同的内核上并行执行。除了 shell 管道之外,您还可以创建管道和文件以在脚本本身中执行此操作。假设 bzip2 存在于您的系统上,您可以执行以下操作。我添加了 tqdm 模块来打印进度。

#!/usr/bin/env python3

import subprocess as subp
from pathlib import Path
import sys
import tqdm

# TODO: Better command line
try:
    in_file_name = Path(sys.argv[1])
except IndexError:
    print("usage: unbzcsv.py filename")
    exit(1)

# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None

# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name], 
        stdin=subp.DEVNULL, stdout=subp.PIPE)

# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)

# read lines and fan out to files
try:
    for line in progress:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
            if bzwriter is not None:
                bzwriter.stdin.close()
                bzwriter.wait()
                bzwriter = None
                bzfile.close()
            print("\nwriting", out_file_name)
            progress.refresh()
            bzfile = open(out_file_name, "wb")
            bzwriter = subp.Popen(["bzip2", "--compress"],
                    stdin=subp.PIPE, stdout=bzfile)
        # write the row
        bzwriter.stdin.write(line)
finally:
    bzreader.terminate() # in case of error
    if bzwriter:
        bzwriter.stdin.close()
        bzwriter.wait()
        bzfile.close()
    bzreader.wait()