来自 pyspark worker 的 HDFS / Hadoop api 访问

HDFS / Hadoop api access from pyspark worker

我需要从 pyspark worker 中 read/scan/write 文件 to/from hdfs。

请注意以下 api 不适用,因为它们 运行 来自 驱动程序

sc.textFile()
sc.saveAsParquetFile()

等等

最好不要涉及额外的第三方库(例如 pyhadoop)。

一个选择是 shell 出来,例如

 os.system('hdfs dfs -ls %(hdfsPath)s' %locals())

但是有没有更原生的 pyspark 方法来实现这个?

UPDATE 这不是广播数据的情况,因为每个worker会从hdfs读取不同的数据。其中一个用例是在每个 worker 中读取一些大型二进制文件(这显然不是广播的情况)。另一种情况是读取包含指令的 "command" 文件。我已经在原生 hadoop 和 scala spark 中成功地使用了这个模式。

更原生的 PySpark 方法是使用 sc.textFile() 或其他读取方法读取驱动程序中的数据,并将其作为 RDD 或广播变量传递给工作人员(如果它足够小以适合)到每个执行者的内存中。

你能描述一下你的情况吗,我怀疑你真的需要阅读 workers 中的文件

更新:

简短摘要:

  1. 直接从大集群上的 worker 读取文件集可能会杀死 namenode
  2. 在大多数情况下,不需要直接从 worker 读取单独的文件。您可以只为 textFile() 方法通配文件集或使用 wholeTextFiles()binaryFiles() 方法来读取文件集及其名称
  3. 在处理千兆字节图像的特定情况下,只需将它们放入序列文件并使用 sequenceFile() 方法读取它
  4. 使用 Python 直接从 HSFS 读取而无需额外的库可以通过直接查询 WebHDFS REST API 来实现,这是一种矫枉过正,因为这正是库所实现的。另一种选择可能是使用 pipe() Spark 方法调用 Java 程序读取 HDFS 文件并将它们以序列化形式返回到标准输出。另一种选择是通过转义到 shell 将文件从 HDFS 复制到临时 space,然后使用标准读取文件功能读取该文件。就个人而言,我会解雇我的开发人员,因为他实施了我在此处提出的任何方法

解决方案似乎是子进程输出(没有直接 python 访问)。将已接受的答案和评论之一拼凑在一起:Python read file as stream from HDFS

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in iter(cat.stdout.readline, ''): 
    print line,   # include the comma