在 PySpark 中涉及带有管道的子进程的映射步骤失败

Map step that involves subprocess with pipe fails in PySpark

我的目标是读取由 csv 数据组成的 hdfs 上的二进制(gpg 加密)文件。我的方法——遵循 ——定义一个 Python 函数来读取和解密 gpg 文件,产生每一行,并将此函数作为 flatMap 应用到并行文件列表。

本质上,Python 函数生成一个子进程,该子进程使用 hadoop 读取文件并将结果通过管道传输到 gpg 进行解密。这在 运行 本地模式下的 Spark 时工作得很好。但是,运行 它分发了 (yarn-client),一个简单的行计数 returns 0,本质上是因为 Python 认为 stdout 管道总是关闭的.

问题似乎是子进程涉及两个命令之间的管道。当我删除后者时(只是加密文件的行数),行数与我在命令行上得到的相匹配。我已经尝试了多种方法,结果都是一样的。

这是 Python 函数:

import subprocess as sp

def read_gpg_file_on_hdfs(filename):
    # Method 1:
    p = sp.Popen('hadoop fs -cat {} | gpg -d'.format(filename), shell=True,
                 stdout=sp.PIPE)
    # Method 2:
    p1 = sp.Popen(['hadoop', 'fs', '-cat', filename], stdout=sp.PIPE)
    p = sp.Popen(['gpg', '-d'], stdin=p1.stdout, stdout=sp.PIPE)
    p1.stdout.close()

    # Method 3:
    p = sp.Ppen('gpg -d <(hadoop fs -cat {})'.format(filename), shell=True,
                stdout=sp.PIPE, stderr=sp.PIPE)

    for line in p.stdout:
        yield line.strip()

这是 Spark 命令:

sc.parallelize(['/path/to/file.gpg']).flatMap(read_gpg_file_on_hdfs).count()

现在我知道 PySpark 使用管道与 Spark 通信,但我没有遵循细节,我不知道这是否会影响我正在尝试做的事情。我的问题是是否有办法完成我想做的事情。

请注意,我使用的是分布式 Spark 1.2.1(MapR 的最新版本)。另外,我考虑过使用 binaryFiles,但是对于我有时会遇到的大型 gpg 文件,这会失败。

提前致谢!

事实证明 gpg 命令实际上是问题所在。据推测,这与子进程如何在本地模式和分布式模式下启动的细节有关,但在本地模式下,gpghomedir 设置正确。但是当以分布式模式启动时,homedir 指向一个不正确的目录,第二个子进程立即失败。此错误消息似乎没有记录在任何地方,因此 stdout 只是作为空字符串返回。