带有paramiko的MapReduce如何在流式传输时打印stdout
MapReduce with paramiko how to print stdout as it streams
我使用 paramiko 创建了一个小的 Python 脚本,它允许我 运行 MapReduce 作业而无需使用 PuTTY 或 cmd windows 来启动作业。这很好用,除了在工作完成之前我看不到标准输出。我如何设置它以便我可以看到生成的每一行标准输出,就像我可以通过 cmd window 看到的一样?
这是我的脚本:
import paramiko
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxxx'
pw = 'xxxxxxxxx'
# Commands
list_dir = "ls /nfs_home/appers/cnielsen -l"
MR = "hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -files /nfs_home/appers/cnielsen/product_lookups.xml -file /nfs_home/appers/cnielsen/Mapper.py -file /nfs_home/appers/cnielsen/Reducer.py -mapper '/usr/lib/python_2.7.3/bin/python Mapper.py test1' -file /nfs_home/appers/cnielsen/Process.py -reducer '/usr/lib/python_2.7.3/bin/python Reducer.py' -input /nfs_home/appers/extracts/*/*.xml -output /user/loc/output/cnielsen/test51"
getmerge = "hadoop fs -getmerge /user/loc/output/cnielsen/test51 /nfs_home/appers/cnielsen/test_010716_0.txt"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, username=user, password=pw)
##stdin, stdout, stderr = client.exec_command(list_dir)
##stdin, stdout, stderr = client.exec_command(getmerge)
stdin, stdout, stderr = client.exec_command(MR)
print "Executing command..."
for line in stdout:
print '... ' + line.strip('\n')
for l in stderr:
print '... ' + l.strip('\n')
client.close()
这段代码隐式调用了 stdout.read(),它会一直阻塞到 EOF。因此,您必须分块阅读 stdout/stderr 才能立即获得输出。 this answer and especially a modified version of this answer should help you resolving this issue. I'd recommend to adapt answer 2 为您 use-case 防止一些常见的停顿情况。
这里有一个改编自 answer 1
的例子
sin,sout,serr = ssh.exec_command("while true; do uptime; done")
def line_buffered(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ''
for l in line_buffered(sout): # or serr
print l
我使用 paramiko 创建了一个小的 Python 脚本,它允许我 运行 MapReduce 作业而无需使用 PuTTY 或 cmd windows 来启动作业。这很好用,除了在工作完成之前我看不到标准输出。我如何设置它以便我可以看到生成的每一行标准输出,就像我可以通过 cmd window 看到的一样?
这是我的脚本:
import paramiko
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxxx'
pw = 'xxxxxxxxx'
# Commands
list_dir = "ls /nfs_home/appers/cnielsen -l"
MR = "hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -files /nfs_home/appers/cnielsen/product_lookups.xml -file /nfs_home/appers/cnielsen/Mapper.py -file /nfs_home/appers/cnielsen/Reducer.py -mapper '/usr/lib/python_2.7.3/bin/python Mapper.py test1' -file /nfs_home/appers/cnielsen/Process.py -reducer '/usr/lib/python_2.7.3/bin/python Reducer.py' -input /nfs_home/appers/extracts/*/*.xml -output /user/loc/output/cnielsen/test51"
getmerge = "hadoop fs -getmerge /user/loc/output/cnielsen/test51 /nfs_home/appers/cnielsen/test_010716_0.txt"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, username=user, password=pw)
##stdin, stdout, stderr = client.exec_command(list_dir)
##stdin, stdout, stderr = client.exec_command(getmerge)
stdin, stdout, stderr = client.exec_command(MR)
print "Executing command..."
for line in stdout:
print '... ' + line.strip('\n')
for l in stderr:
print '... ' + l.strip('\n')
client.close()
这段代码隐式调用了 stdout.read(),它会一直阻塞到 EOF。因此,您必须分块阅读 stdout/stderr 才能立即获得输出。 this answer and especially a modified version of this answer should help you resolving this issue. I'd recommend to adapt answer 2 为您 use-case 防止一些常见的停顿情况。
这里有一个改编自 answer 1
的例子sin,sout,serr = ssh.exec_command("while true; do uptime; done")
def line_buffered(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ''
for l in line_buffered(sout): # or serr
print l