subprocess.Popen 在 ipyparallel 外部工作但在内部不工作?

subprocess.Popen works outside but not inside ipyparallel?

我正在尝试使用 ipyparallel 并行化来自 here 的一些代码。简而言之,我可以创建在 apply_sync() 之外正常工作的函数,但我似乎无法让它们在其中工作(我发誓我早些时候有这个工作,但我找不到未损坏的代码)。一个简单的例子:

def test3(fname = '7_1197_.txt'):
    import subprocess
    command = 'touch data/sentiment/' + fname + '.test'
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    while True:
        out = child.stdout.read(1)
        if out == '' and child.poll() != None:
            return 
test3() #this works, creates a file with the .test extention
results = view.map_sync(test3, speeches) #this doesn't work. No files created.

这是我实际要使用的函数的简短版本。它本身工作正常。在 apply_sync() 中,它根据 htop 启动 java 个进程,但它似乎没有从这些进程中得到任何回报。

def test2(fname = '7_1197_.txt'):
    import subprocess

    settings = ' -mx5g edu.stanford.nlp.sentiment.SentimentPipeline'
    inputFile = ' -file data/sentiment/' + fname
    command = 'java ' + settings + inputFile
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    results = []
    while True:
        out = child.stdout.read(1)
        if out == '' and child.poll() != None:
            return ''.join(results)
        if out != '':
            results.extend(out)
test2() #Works fine, produces output
results = view.map_sync(test2, speeches) #Doesn't work: the results are empty strings.

我尝试了一个 return 命令变量的版本。发送到 Popen 的命令很好,在命令行中手动粘贴时它们可以工作。我认为这可能只是管道问题,但更改命令以将输出重定向到带有 ' > '+fname+'.out' 的文件在 apply_sync() 调用中也不起作用(不生成输出文件)。

我应该怎么做才能从系统回调中得到 stdout

我看到两个潜在的陷阱。一种用于阻止,一种用于丢失文件。对于丢失的文件,你应该确保你的引擎和你的本地会话在同一个工作目录中,或者确保使用绝对路径。本地和远程同步路径的快速方法:

client[:].apply_sync(os.chdir, os.getcwd())

也就是说:获取local cwd,然后到处调用os.chdir,这样我们就共享同一个工作目录。如果您在 IPython 会话中,一个快速的快捷方式是:

%px cd {os.getcwd()}

至于阻塞,我的第一个想法是:当运行并行时,您是否可能使用Python 3?如果是这样,child.stdout.read returns bytes 而不是 text。在 Python 2 中,str is bytes,所以 out == '' 会起作用,但在 Python 3 中,条件 out == '' 永远不会为真,因为 b'' != u'',你的函数永远不会 return.

一些更有用的信息:

  1. stdout.read(N) 将读取 最多 个字节数,如果输出完整则截断。这很有用,因为 read(1) 将循环 多次 次,即使输出都在等待读取。
  2. 如果输出完成,
  3. stdout.read() 只会 return 空字节串,所以你只需要检查它,而不是 returning 之前的 child.poll()。 (只要你没有在FD上设置NOWAIT就可以,这是一些高级用法)。
  4. 如果你想看到函数returns之前的部分输出,你可以在sys.stdout上重新显示输出,并且在IPython中看到部分输出而不用等待最终结果.

下面是您的函数的几个实现,具有不同的目标。

第一个似乎可以使用 Popen.communicate 实现您当前的目标,如果您实际上不想对部分输出做任何事情,这是最简单的选择 and/or您正在等待输出的功能:

def simple(fname = '7_1197_.txt'):
    import subprocess
    command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    # if we aren't doing anything with partial outputs,
    # child.communicate() does all of our waiting/capturing for us:
    out, err = child.communicate()
    return out

(使用 stderr=subprocess.PIPE 包含 stderr 捕获也可能有用,或者使用 stderr=subprocess.STDOUT 将 stderr 合并到 stdout 中)。

这是另一个例子,将 stderr 收集到 stdout,并分块读取:

def chunked(fname = '7_1197_.txt'):
    import subprocess
    command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)
    child = subprocess.Popen(command, shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT,
                            )
    chunks = []
    while True:
        chunk = child.stdout.read(80) # read roughly one line at a time
        if chunk:
            chunks.append(chunk)
            continue
        else:
            # read will only return an empty bytestring when output is finished
            break
    return b''.join(chunks)

请注意,我们可以使用 if not chunk 条件来确定输出何时完成,而不是 if chunk == '',因为空字节串是假的。如果我们不对部分输出做些什么,那么真的没有理由使用它而不是上面更简单的 .communicate() 版本。

最后,这是一个可以与 IPython 一起使用的版本,它不是捕获和 return 输出,而是重新显示它,我们可以用它来显示 部分 在客户端输出:

def chunked_redisplayed(fname = '7_1197_.txt'):
    import sys, subprocess
    command = 'for i in {{1..20}}; do echo "{0}"; sleep 0.25; done'.format(fname)
    child = subprocess.Popen(command, shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT,
                            )
    while True:
        chunk = child.stdout.read(80) # read roughly one line at a time
        if chunk:
            sys.stdout.write(chunk.decode('utf8', 'replace'))
            continue
        else:
            # read will only return an empty bytestring when output is finished
            break

在客户端中,如果您使用 map_async 而不是 map_sync,您可以检查 result.stdout,这是一个标准输出流列表 so far,所以你可以查看进度:

amr = view.map_async(chunked_redisplayed, speeches)
amr.stdout # list of stdout text, updated in the background as output is produced
amr.wait_interactive() # waits and shows progress
amr.get() # waits for and returns the actual result