Hadoop中高效的拷贝方法
Efficient copy method in Hadoop
除了 distcp
之外,是否有更快或更有效的跨 HDFS 文件复制方法。我尝试了常规 hadoop fs -cp
和 distcp
,两者似乎都提供相同的传输速率,大约 50 MBPS。
我将 5TB 的数据拆分为每个 500GB 的较小文件,我必须将这些文件复制到 HDFS 上的新位置。有什么想法吗?
编辑:
原来的 distcp
只生成 1 个映射器,所以我添加了 -m100
选项来增加映射器
hadoop distcp -D mapred.job.name="Gigafiles distcp" -pb -i -m100 "/user/abc/file1" "/xyz/aaa/file1"
但它仍然只产生 1 个而不是 100 个映射器。我在这里遗漏了什么吗?
我能够通过使用 pig 脚本从路径 A 读取数据,转换为 parquet(无论如何这是所需的存储格式)并将其写入路径 B 来解决这个问题。这个过程花了将近 20 分钟平均 500GB 文件。谢谢你的建议。
如果您想将文件的子集从一个文件夹复制到 HDFS 中的另一个文件夹,我想到了这个。它可能不如 distcp
高效,但可以完成工作并为您提供更多自由,以防您想进行其他操作。它还会检查每个文件是否已存在:
import pandas as pd
import os
from multiprocessing import Process
from subprocess import Popen, PIPE
hdfs_path_1 = '/path/to/the/origin/'
hdfs_path_2 = '/path/to/the/destination/'
process = Popen(f'hdfs dfs -ls -h {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
already_processed = [fn.split()[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
print(f'Total number of ALREADY PROCESSED tar files = {len(already_processed)}')
df = pd.read_csv("list_of_files.csv") # or any other lists that you have
to_do_tar_list = list(df.tar)
to_do_list = set(to_do_tar_list) - set(already_processed)
print(f'To go: {len(to_do_list)}')
def copyy(f):
process = Popen(f'hdfs dfs -cp {hdfs_path_1}{f} {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
if std_out!= b'':
print(std_out)
ps = []
for f in to_do_list:
p = Process(target=copyy, args=(f,))
p.start()
ps.append(p)
for p in ps:
p.join()
print('done')
此外,如果您想获得目录中所有文件的列表,请使用:
from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
除了 distcp
之外,是否有更快或更有效的跨 HDFS 文件复制方法。我尝试了常规 hadoop fs -cp
和 distcp
,两者似乎都提供相同的传输速率,大约 50 MBPS。
我将 5TB 的数据拆分为每个 500GB 的较小文件,我必须将这些文件复制到 HDFS 上的新位置。有什么想法吗?
编辑:
原来的 distcp
只生成 1 个映射器,所以我添加了 -m100
选项来增加映射器
hadoop distcp -D mapred.job.name="Gigafiles distcp" -pb -i -m100 "/user/abc/file1" "/xyz/aaa/file1"
但它仍然只产生 1 个而不是 100 个映射器。我在这里遗漏了什么吗?
我能够通过使用 pig 脚本从路径 A 读取数据,转换为 parquet(无论如何这是所需的存储格式)并将其写入路径 B 来解决这个问题。这个过程花了将近 20 分钟平均 500GB 文件。谢谢你的建议。
如果您想将文件的子集从一个文件夹复制到 HDFS 中的另一个文件夹,我想到了这个。它可能不如 distcp
高效,但可以完成工作并为您提供更多自由,以防您想进行其他操作。它还会检查每个文件是否已存在:
import pandas as pd
import os
from multiprocessing import Process
from subprocess import Popen, PIPE
hdfs_path_1 = '/path/to/the/origin/'
hdfs_path_2 = '/path/to/the/destination/'
process = Popen(f'hdfs dfs -ls -h {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
already_processed = [fn.split()[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
print(f'Total number of ALREADY PROCESSED tar files = {len(already_processed)}')
df = pd.read_csv("list_of_files.csv") # or any other lists that you have
to_do_tar_list = list(df.tar)
to_do_list = set(to_do_tar_list) - set(already_processed)
print(f'To go: {len(to_do_list)}')
def copyy(f):
process = Popen(f'hdfs dfs -cp {hdfs_path_1}{f} {hdfs_path_2}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
if std_out!= b'':
print(std_out)
ps = []
for f in to_do_list:
p = Process(target=copyy, args=(f,))
p.start()
ps.append(p)
for p in ps:
p.join()
print('done')
此外,如果您想获得目录中所有文件的列表,请使用:
from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().readlines()[1:]][:-1]