hadoop 文件系统打开文件并跳过第一行
hadoop filesystem open file and skip first line
我正在使用 Python 语言读取 HDFS 中的文件。
每个文件都有一个 header,我正在尝试合并这些文件。但是,每个文件中的 header 也会被合并。
有没有办法从第二个文件中跳过 header?
hadoop = sc._jvm.org.apache.hadoop
conf = hadoop.conf.Configuration()
fs = hadoop.fs.FileSystem.get(conf)
src_dir = "/mnt/test/"
out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite)
files = []
for f in fs.listStatus(hadoop.fs.Path(src_dir)):
if f.isFile():
files.append(f.getPath())
for file in files:
in_stream = fs.open(file)
hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False)
目前我已经通过以下逻辑解决了问题,但是想知道是否有更好更有效的解决方案?感谢您的帮助
for idx,file in enumerate(files):
if debug:
print("Appending file {} into {}".format(file, dst_file))
# remove header from the second file
if idx>0:
file_str = ""
with open('/'+str(file).replace(':',''),'r+') as f:
for idx,line in enumerate(f):
if idx>0:
file_str = file_str + line
with open('/'+str(file).replace(':',''), "w+") as f:
f.write(file_str)
in_stream = fs.open(file) # InputStream object and copy the stream
try:
hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False) # False means don't close out_stream
finally:
in_stream.close()
您现在正在做的是重复追加到一个字符串。这是一个相当缓慢的过程。为什么不在阅读时直接写入输出文件?
for file_idx, file in enumerate(files):
with open(...) as out_f, open(...) as in_f:
for line_num, line in enumerate(in_f):
if file_idx == 0 or line_num > 0:
f_out.write(line)
如果您可以一次加载所有文件,您也可以使用 readline
后跟 readlines
:
跳过第一行
for file_idx, file in enumerate(files):
with open(...) as out_f, open(...) as in_f:
if file_idx != 0:
f_in.readline()
f_out.writelines(f_in.readlines())
我正在使用 Python 语言读取 HDFS 中的文件。
每个文件都有一个 header,我正在尝试合并这些文件。但是,每个文件中的 header 也会被合并。
有没有办法从第二个文件中跳过 header?
hadoop = sc._jvm.org.apache.hadoop
conf = hadoop.conf.Configuration()
fs = hadoop.fs.FileSystem.get(conf)
src_dir = "/mnt/test/"
out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite)
files = []
for f in fs.listStatus(hadoop.fs.Path(src_dir)):
if f.isFile():
files.append(f.getPath())
for file in files:
in_stream = fs.open(file)
hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False)
目前我已经通过以下逻辑解决了问题,但是想知道是否有更好更有效的解决方案?感谢您的帮助
for idx,file in enumerate(files):
if debug:
print("Appending file {} into {}".format(file, dst_file))
# remove header from the second file
if idx>0:
file_str = ""
with open('/'+str(file).replace(':',''),'r+') as f:
for idx,line in enumerate(f):
if idx>0:
file_str = file_str + line
with open('/'+str(file).replace(':',''), "w+") as f:
f.write(file_str)
in_stream = fs.open(file) # InputStream object and copy the stream
try:
hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False) # False means don't close out_stream
finally:
in_stream.close()
您现在正在做的是重复追加到一个字符串。这是一个相当缓慢的过程。为什么不在阅读时直接写入输出文件?
for file_idx, file in enumerate(files):
with open(...) as out_f, open(...) as in_f:
for line_num, line in enumerate(in_f):
if file_idx == 0 or line_num > 0:
f_out.write(line)
如果您可以一次加载所有文件,您也可以使用 readline
后跟 readlines
:
for file_idx, file in enumerate(files):
with open(...) as out_f, open(...) as in_f:
if file_idx != 0:
f_in.readline()
f_out.writelines(f_in.readlines())