subprocess.Popen 在 ipyparallel 外部工作但在内部不工作?
subprocess.Popen works outside but not inside ipyparallel?
我正在尝试使用 ipyparallel
并行化来自 here 的一些代码。简而言之,我可以创建在 apply_sync()
之外正常工作的函数,但我似乎无法让它们在其中工作(我发誓我早些时候有这个工作,但我找不到未损坏的代码)。一个简单的例子:
def test3(fname = '7_1197_.txt'):
import subprocess
command = 'touch data/sentiment/' + fname + '.test'
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
while True:
out = child.stdout.read(1)
if out == '' and child.poll() != None:
return
test3() #this works, creates a file with the .test extention
results = view.map_sync(test3, speeches) #this doesn't work. No files created.
这是我实际要使用的函数的简短版本。它本身工作正常。在 apply_sync()
中,它根据 htop
启动 java
个进程,但它似乎没有从这些进程中得到任何回报。
def test2(fname = '7_1197_.txt'):
import subprocess
settings = ' -mx5g edu.stanford.nlp.sentiment.SentimentPipeline'
inputFile = ' -file data/sentiment/' + fname
command = 'java ' + settings + inputFile
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
results = []
while True:
out = child.stdout.read(1)
if out == '' and child.poll() != None:
return ''.join(results)
if out != '':
results.extend(out)
test2() #Works fine, produces output
results = view.map_sync(test2, speeches) #Doesn't work: the results are empty strings.
我尝试了一个 return 命令变量的版本。发送到 Popen
的命令很好,在命令行中手动粘贴时它们可以工作。我认为这可能只是管道问题,但更改命令以将输出重定向到带有 ' > '+fname+'.out'
的文件在 apply_sync()
调用中也不起作用(不生成输出文件)。
我应该怎么做才能从系统回调中得到 stdout
?
我看到两个潜在的陷阱。一种用于阻止,一种用于丢失文件。对于丢失的文件,你应该确保你的引擎和你的本地会话在同一个工作目录中,或者确保使用绝对路径。本地和远程同步路径的快速方法:
client[:].apply_sync(os.chdir, os.getcwd())
也就是说:获取local cwd,然后到处调用os.chdir
,这样我们就共享同一个工作目录。如果您在 IPython 会话中,一个快速的快捷方式是:
%px cd {os.getcwd()}
至于阻塞,我的第一个想法是:当运行并行时,您是否可能使用Python 3?如果是这样,child.stdout.read
returns bytes 而不是 text。在 Python 2 中,str is bytes
,所以 out == ''
会起作用,但在 Python 3 中,条件 out == ''
永远不会为真,因为 b'' != u''
,你的函数永远不会 return.
一些更有用的信息:
stdout.read(N)
将读取 最多 个字节数,如果输出完整则截断。这很有用,因为 read(1)
将循环 多次 次,即使输出都在等待读取。
如果输出完成,stdout.read()
只会 return 空字节串,所以你只需要检查它,而不是 returning 之前的 child.poll()
。 (只要你没有在FD上设置NOWAIT就可以,这是一些高级用法)。
- 如果你想看到函数returns之前的部分输出,你可以在sys.stdout上重新显示输出,并且在IPython中看到部分输出而不用等待最终结果.
下面是您的函数的几个实现,具有不同的目标。
第一个似乎可以使用 Popen.communicate 实现您当前的目标,如果您实际上不想对部分输出做任何事情,这是最简单的选择 and/or您正在等待输出的功能:
def simple(fname = '7_1197_.txt'):
import subprocess
command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
# if we aren't doing anything with partial outputs,
# child.communicate() does all of our waiting/capturing for us:
out, err = child.communicate()
return out
(使用 stderr=subprocess.PIPE
包含 stderr 捕获也可能有用,或者使用 stderr=subprocess.STDOUT
将 stderr 合并到 stdout 中)。
这是另一个例子,将 stderr 收集到 stdout,并分块读取:
def chunked(fname = '7_1197_.txt'):
import subprocess
command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)
child = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
chunks = []
while True:
chunk = child.stdout.read(80) # read roughly one line at a time
if chunk:
chunks.append(chunk)
continue
else:
# read will only return an empty bytestring when output is finished
break
return b''.join(chunks)
请注意,我们可以使用 if not chunk
条件来确定输出何时完成,而不是 if chunk == ''
,因为空字节串是假的。如果我们不对部分输出做些什么,那么真的没有理由使用它而不是上面更简单的 .communicate()
版本。
最后,这是一个可以与 IPython 一起使用的版本,它不是捕获和 return 输出,而是重新显示它,我们可以用它来显示 部分 在客户端输出:
def chunked_redisplayed(fname = '7_1197_.txt'):
import sys, subprocess
command = 'for i in {{1..20}}; do echo "{0}"; sleep 0.25; done'.format(fname)
child = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
while True:
chunk = child.stdout.read(80) # read roughly one line at a time
if chunk:
sys.stdout.write(chunk.decode('utf8', 'replace'))
continue
else:
# read will only return an empty bytestring when output is finished
break
在客户端中,如果您使用 map_async
而不是 map_sync
,您可以检查 result.stdout
,这是一个标准输出流列表 so far,所以你可以查看进度:
amr = view.map_async(chunked_redisplayed, speeches)
amr.stdout # list of stdout text, updated in the background as output is produced
amr.wait_interactive() # waits and shows progress
amr.get() # waits for and returns the actual result
我正在尝试使用 ipyparallel
并行化来自 here 的一些代码。简而言之,我可以创建在 apply_sync()
之外正常工作的函数,但我似乎无法让它们在其中工作(我发誓我早些时候有这个工作,但我找不到未损坏的代码)。一个简单的例子:
def test3(fname = '7_1197_.txt'):
import subprocess
command = 'touch data/sentiment/' + fname + '.test'
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
while True:
out = child.stdout.read(1)
if out == '' and child.poll() != None:
return
test3() #this works, creates a file with the .test extention
results = view.map_sync(test3, speeches) #this doesn't work. No files created.
这是我实际要使用的函数的简短版本。它本身工作正常。在 apply_sync()
中,它根据 htop
启动 java
个进程,但它似乎没有从这些进程中得到任何回报。
def test2(fname = '7_1197_.txt'):
import subprocess
settings = ' -mx5g edu.stanford.nlp.sentiment.SentimentPipeline'
inputFile = ' -file data/sentiment/' + fname
command = 'java ' + settings + inputFile
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
results = []
while True:
out = child.stdout.read(1)
if out == '' and child.poll() != None:
return ''.join(results)
if out != '':
results.extend(out)
test2() #Works fine, produces output
results = view.map_sync(test2, speeches) #Doesn't work: the results are empty strings.
我尝试了一个 return 命令变量的版本。发送到 Popen
的命令很好,在命令行中手动粘贴时它们可以工作。我认为这可能只是管道问题,但更改命令以将输出重定向到带有 ' > '+fname+'.out'
的文件在 apply_sync()
调用中也不起作用(不生成输出文件)。
我应该怎么做才能从系统回调中得到 stdout
?
我看到两个潜在的陷阱。一种用于阻止,一种用于丢失文件。对于丢失的文件,你应该确保你的引擎和你的本地会话在同一个工作目录中,或者确保使用绝对路径。本地和远程同步路径的快速方法:
client[:].apply_sync(os.chdir, os.getcwd())
也就是说:获取local cwd,然后到处调用os.chdir
,这样我们就共享同一个工作目录。如果您在 IPython 会话中,一个快速的快捷方式是:
%px cd {os.getcwd()}
至于阻塞,我的第一个想法是:当运行并行时,您是否可能使用Python 3?如果是这样,child.stdout.read
returns bytes 而不是 text。在 Python 2 中,str is bytes
,所以 out == ''
会起作用,但在 Python 3 中,条件 out == ''
永远不会为真,因为 b'' != u''
,你的函数永远不会 return.
一些更有用的信息:
stdout.read(N)
将读取 最多 个字节数,如果输出完整则截断。这很有用,因为read(1)
将循环 多次 次,即使输出都在等待读取。
如果输出完成,stdout.read()
只会 return 空字节串,所以你只需要检查它,而不是 returning 之前的child.poll()
。 (只要你没有在FD上设置NOWAIT就可以,这是一些高级用法)。- 如果你想看到函数returns之前的部分输出,你可以在sys.stdout上重新显示输出,并且在IPython中看到部分输出而不用等待最终结果.
下面是您的函数的几个实现,具有不同的目标。
第一个似乎可以使用 Popen.communicate 实现您当前的目标,如果您实际上不想对部分输出做任何事情,这是最简单的选择 and/or您正在等待输出的功能:
def simple(fname = '7_1197_.txt'):
import subprocess
command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
# if we aren't doing anything with partial outputs,
# child.communicate() does all of our waiting/capturing for us:
out, err = child.communicate()
return out
(使用 stderr=subprocess.PIPE
包含 stderr 捕获也可能有用,或者使用 stderr=subprocess.STDOUT
将 stderr 合并到 stdout 中)。
这是另一个例子,将 stderr 收集到 stdout,并分块读取:
def chunked(fname = '7_1197_.txt'):
import subprocess
command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)
child = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
chunks = []
while True:
chunk = child.stdout.read(80) # read roughly one line at a time
if chunk:
chunks.append(chunk)
continue
else:
# read will only return an empty bytestring when output is finished
break
return b''.join(chunks)
请注意,我们可以使用 if not chunk
条件来确定输出何时完成,而不是 if chunk == ''
,因为空字节串是假的。如果我们不对部分输出做些什么,那么真的没有理由使用它而不是上面更简单的 .communicate()
版本。
最后,这是一个可以与 IPython 一起使用的版本,它不是捕获和 return 输出,而是重新显示它,我们可以用它来显示 部分 在客户端输出:
def chunked_redisplayed(fname = '7_1197_.txt'):
import sys, subprocess
command = 'for i in {{1..20}}; do echo "{0}"; sleep 0.25; done'.format(fname)
child = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
while True:
chunk = child.stdout.read(80) # read roughly one line at a time
if chunk:
sys.stdout.write(chunk.decode('utf8', 'replace'))
continue
else:
# read will only return an empty bytestring when output is finished
break
在客户端中,如果您使用 map_async
而不是 map_sync
,您可以检查 result.stdout
,这是一个标准输出流列表 so far,所以你可以查看进度:
amr = view.map_async(chunked_redisplayed, speeches)
amr.stdout # list of stdout text, updated in the background as output is produced
amr.wait_interactive() # waits and shows progress
amr.get() # waits for and returns the actual result