如何在我的 Python 脚本中 运行 PSCP cmd window 步骤
How to run PSCP cmd window step in my Python script
我正在 运行使用 paramiko 模块从 Python 脚本中使用 Hadoop MapReduce 和其他 SSH 命令(代码可见 )。 MapReduce 作业完成后,我 运行 getmerge 步骤将输出放入文本文件。
问题是,我必须打开cmd window 和运行 PSCP 才能将output.txt 文件从HDFS 环境下载到我的计算机。例如:
pscp xxxx@xx.xx.xx.xx:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test
我如何将这个 pscp 步骤合并到我的脚本中,这样我就不必在我的 MapReduce 和 getmerge 任务完成后打开 cmd window 到 运行?我希望我的脚本能够 运行 MR 任务、getmerge 任务,然后自动将 MR 输出保存到我的计算机。
这是我的 .
我已经用下面的代码解决了这个问题。诀窍是使用 scp 模块并导入 SCPClient。请参阅下面的 scp_download(ssh) 函数。
当 MapReduce 作业完成时,getmerge 命令是 运行,然后是 scp_download 步骤。
import paramiko
from scp import SCPClient
import time
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
port = 22
# Paths
input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
# File names
xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
target_file = 'test_011416_3.txt'
# ----------------------------------------------------
def createSSHClient(host_ip, port, user, pw):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, port, user, pw)
return client
# ----------------------------------------------------
def buildMRcommand(product_name):
space = " "
mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
'-files', hdfs_home+xml_lookup_file,
'-file', hdfs_home+mapper,
'-file', hdfs_home+reducer,
'-mapper', "'"+python_path, mapper, product_name+"'",
'-file', hdfs_home+helper_script,
'-reducer', "'"+python_path, reducer+"'",
'-input', input_loc,
'-output', output_loc+output_ref]
MR_command = space.join(mr_command_list)
print MR_command
return MR_command
# ----------------------------------------------------
def unbuffered_lines(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ""
# ----------------------------------------------------
def stream_output(stdin, stdout, stderr):
writer = open(output_log, 'w')
# Using line_buffer function
for l in unbuffered_lines(stderr):
e = '[stderr] ' + l
print '[stderr] ' + l.strip('\n')
writer.write(e)
# gives full listing..
for line in stdout:
r = '[stdout]' + line
print '[stdout]' + line.strip('\n')
writer.write(r)
writer.close()
# ----------------------------------------------------
def run_MapReduce(ssh):
stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name))
stream_output(stdin, stdout, stderr)
return 1
# ----------------------------------------------------
def run_list_dir(ssh):
list_dir = "ls "+hdfs_home+" -l"
stdin, stdout, stderr = ssh.exec_command(list_dir)
stream_output(stdin, stdout, stderr)
# ----------------------------------------------------
def run_getmerge(ssh):
getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file
print getmerge
stdin, stdout, stderr = ssh.exec_command(getmerge)
for line in stdout:
print '[stdout]' + line.strip('\n')
time.sleep(1.5)
return 1
# ----------------------------------------------------
def scp_download(ssh):
scp = SCPClient(ssh.get_transport())
print "Fetching SCP data.."
scp.get(hdfs_home+target_file, local_dir)
print "File download complete."
# ----------------------------------------------------
def main():
# Get the ssh connection
global ssh
ssh = createSSHClient(host_ip, port, user, pw)
print "Executing command..."
# Command list
##run_list_dir(ssh)
##run_getmerge(ssh)
##scp_download(ssh)
# Run MapReduce
MR_status = 0
MR_status = run_MapReduce(ssh)
if MR_status == 1:
gs = 0
gs = run_getmerge(ssh)
if gs == 1:
scp_download(ssh)
# Close ssh connection
ssh.close()
if __name__ == '__main__':
main()
我正在 运行使用 paramiko 模块从 Python 脚本中使用 Hadoop MapReduce 和其他 SSH 命令(代码可见
问题是,我必须打开cmd window 和运行 PSCP 才能将output.txt 文件从HDFS 环境下载到我的计算机。例如:
pscp xxxx@xx.xx.xx.xx:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test
我如何将这个 pscp 步骤合并到我的脚本中,这样我就不必在我的 MapReduce 和 getmerge 任务完成后打开 cmd window 到 运行?我希望我的脚本能够 运行 MR 任务、getmerge 任务,然后自动将 MR 输出保存到我的计算机。
这是我的
我已经用下面的代码解决了这个问题。诀窍是使用 scp 模块并导入 SCPClient。请参阅下面的 scp_download(ssh) 函数。
当 MapReduce 作业完成时,getmerge 命令是 运行,然后是 scp_download 步骤。
import paramiko
from scp import SCPClient
import time
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
port = 22
# Paths
input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
# File names
xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
target_file = 'test_011416_3.txt'
# ----------------------------------------------------
def createSSHClient(host_ip, port, user, pw):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, port, user, pw)
return client
# ----------------------------------------------------
def buildMRcommand(product_name):
space = " "
mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
'-files', hdfs_home+xml_lookup_file,
'-file', hdfs_home+mapper,
'-file', hdfs_home+reducer,
'-mapper', "'"+python_path, mapper, product_name+"'",
'-file', hdfs_home+helper_script,
'-reducer', "'"+python_path, reducer+"'",
'-input', input_loc,
'-output', output_loc+output_ref]
MR_command = space.join(mr_command_list)
print MR_command
return MR_command
# ----------------------------------------------------
def unbuffered_lines(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ""
# ----------------------------------------------------
def stream_output(stdin, stdout, stderr):
writer = open(output_log, 'w')
# Using line_buffer function
for l in unbuffered_lines(stderr):
e = '[stderr] ' + l
print '[stderr] ' + l.strip('\n')
writer.write(e)
# gives full listing..
for line in stdout:
r = '[stdout]' + line
print '[stdout]' + line.strip('\n')
writer.write(r)
writer.close()
# ----------------------------------------------------
def run_MapReduce(ssh):
stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name))
stream_output(stdin, stdout, stderr)
return 1
# ----------------------------------------------------
def run_list_dir(ssh):
list_dir = "ls "+hdfs_home+" -l"
stdin, stdout, stderr = ssh.exec_command(list_dir)
stream_output(stdin, stdout, stderr)
# ----------------------------------------------------
def run_getmerge(ssh):
getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file
print getmerge
stdin, stdout, stderr = ssh.exec_command(getmerge)
for line in stdout:
print '[stdout]' + line.strip('\n')
time.sleep(1.5)
return 1
# ----------------------------------------------------
def scp_download(ssh):
scp = SCPClient(ssh.get_transport())
print "Fetching SCP data.."
scp.get(hdfs_home+target_file, local_dir)
print "File download complete."
# ----------------------------------------------------
def main():
# Get the ssh connection
global ssh
ssh = createSSHClient(host_ip, port, user, pw)
print "Executing command..."
# Command list
##run_list_dir(ssh)
##run_getmerge(ssh)
##scp_download(ssh)
# Run MapReduce
MR_status = 0
MR_status = run_MapReduce(ssh)
if MR_status == 1:
gs = 0
gs = run_getmerge(ssh)
if gs == 1:
scp_download(ssh)
# Close ssh connection
ssh.close()
if __name__ == '__main__':
main()