如何将 30GB 的大 bz2 文件拆分为多个小的 bz2 文件并为每个文件添加 header
How to split big 30GB bz2 file into multiple small bz2 files and add a header to each
我有大量 bz2
格式的文件(每个 30GB
),但没有任何 header。我可以使用以下 pileline
.
轻松地将它们分成 500M
每个尺寸
bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-
但是我无法添加 header ['a' 'b' 'c' 'd' 'e' 'f' 'timestamp']
我想包含每个拆分的 bz2
文件。
更重要的是我想拆分文件不是基于 500M
,而是我想每天拆分 bz2
文件(例如:splitted_file_2021-01-01.csv.bz2
和 splitted_file_2021-01-02.csv.bz2
)根据数据中timestamp
的内容
数据为tab-delimited文本,如下(没有header,需要添加):
19252547212 1 3041 2 1 74.18 1.8504 2021-05-01 00:00:00
19252547213 1 5055 2 1 0 0 2021-05-01 00:00:00
19252547214 1 5073 1 1 53.81 0.1836 2021-05-01 00:00:00
您可以使用 bz2
包打开 BZ2 编码文件并将它们视为常规文件对象。以二进制形式读/写有一个小的性能优势。假设您的数据是 ASCII 或 UTF-8,并且数据中不需要转义制表符,您可以逐行读取文件,在新时间戳出现时打开并写入输出。
import bz2
import os
outfile = None
date = b""
with bz2.open("file") as fileobj:
for line in filobj:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
new_file = f"splitted_file_{new_date}.csv.bz2"
exists = os.path.exists(new_file)
outfile = bz2.open(new_file, "ab")
if not exists:
outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
# write the row
outfile.writeline(line)
if outfile:
outfile.close()
您可以使用管道加快此过程。将解密和加密都交给单独的 bzip2 进程,这些进程将 运行 在不同的内核上并行执行。除了 shell 管道之外,您还可以创建管道和文件以在脚本本身中执行此操作。假设 bzip2
存在于您的系统上,您可以执行以下操作。我添加了 tqdm
模块来打印进度。
#!/usr/bin/env python3
import subprocess as subp
from pathlib import Path
import sys
import tqdm
# TODO: Better command line
try:
in_file_name = Path(sys.argv[1])
except IndexError:
print("usage: unbzcsv.py filename")
exit(1)
# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None
# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name],
stdin=subp.DEVNULL, stdout=subp.PIPE)
# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)
# read lines and fan out to files
try:
for line in progress:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
if bzwriter is not None:
bzwriter.stdin.close()
bzwriter.wait()
bzwriter = None
bzfile.close()
print("\nwriting", out_file_name)
progress.refresh()
bzfile = open(out_file_name, "wb")
bzwriter = subp.Popen(["bzip2", "--compress"],
stdin=subp.PIPE, stdout=bzfile)
# write the row
bzwriter.stdin.write(line)
finally:
bzreader.terminate() # in case of error
if bzwriter:
bzwriter.stdin.close()
bzwriter.wait()
bzfile.close()
bzreader.wait()
我有大量 bz2
格式的文件(每个 30GB
),但没有任何 header。我可以使用以下 pileline
.
500M
每个尺寸
bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-
但是我无法添加 header ['a' 'b' 'c' 'd' 'e' 'f' 'timestamp']
我想包含每个拆分的 bz2
文件。
更重要的是我想拆分文件不是基于 500M
,而是我想每天拆分 bz2
文件(例如:splitted_file_2021-01-01.csv.bz2
和 splitted_file_2021-01-02.csv.bz2
)根据数据中timestamp
的内容
数据为tab-delimited文本,如下(没有header,需要添加):
19252547212 1 3041 2 1 74.18 1.8504 2021-05-01 00:00:00
19252547213 1 5055 2 1 0 0 2021-05-01 00:00:00
19252547214 1 5073 1 1 53.81 0.1836 2021-05-01 00:00:00
您可以使用 bz2
包打开 BZ2 编码文件并将它们视为常规文件对象。以二进制形式读/写有一个小的性能优势。假设您的数据是 ASCII 或 UTF-8,并且数据中不需要转义制表符,您可以逐行读取文件,在新时间戳出现时打开并写入输出。
import bz2
import os
outfile = None
date = b""
with bz2.open("file") as fileobj:
for line in filobj:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
new_file = f"splitted_file_{new_date}.csv.bz2"
exists = os.path.exists(new_file)
outfile = bz2.open(new_file, "ab")
if not exists:
outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
# write the row
outfile.writeline(line)
if outfile:
outfile.close()
您可以使用管道加快此过程。将解密和加密都交给单独的 bzip2 进程,这些进程将 运行 在不同的内核上并行执行。除了 shell 管道之外,您还可以创建管道和文件以在脚本本身中执行此操作。假设 bzip2
存在于您的系统上,您可以执行以下操作。我添加了 tqdm
模块来打印进度。
#!/usr/bin/env python3
import subprocess as subp
from pathlib import Path
import sys
import tqdm
# TODO: Better command line
try:
in_file_name = Path(sys.argv[1])
except IndexError:
print("usage: unbzcsv.py filename")
exit(1)
# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None
# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name],
stdin=subp.DEVNULL, stdout=subp.PIPE)
# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)
# read lines and fan out to files
try:
for line in progress:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
if bzwriter is not None:
bzwriter.stdin.close()
bzwriter.wait()
bzwriter = None
bzfile.close()
print("\nwriting", out_file_name)
progress.refresh()
bzfile = open(out_file_name, "wb")
bzwriter = subp.Popen(["bzip2", "--compress"],
stdin=subp.PIPE, stdout=bzfile)
# write the row
bzwriter.stdin.write(line)
finally:
bzreader.terminate() # in case of error
if bzwriter:
bzwriter.stdin.close()
bzwriter.wait()
bzfile.close()
bzreader.wait()