来自 pyspark worker 的 HDFS / Hadoop api 访问
HDFS / Hadoop api access from pyspark worker
我需要从 pyspark worker 中 read/scan/write 文件 to/from hdfs。
请注意以下 api 不适用,因为它们 运行 来自 驱动程序 :
sc.textFile()
sc.saveAsParquetFile()
等等
最好不要涉及额外的第三方库(例如 pyhadoop)。
一个选择是 shell 出来,例如
os.system('hdfs dfs -ls %(hdfsPath)s' %locals())
但是有没有更原生的 pyspark 方法来实现这个?
UPDATE 这不是广播数据的情况,因为每个worker会从hdfs读取不同的数据。其中一个用例是在每个 worker 中读取一些大型二进制文件(这显然不是广播的情况)。另一种情况是读取包含指令的 "command" 文件。我已经在原生 hadoop 和 scala spark 中成功地使用了这个模式。
更原生的 PySpark 方法是使用 sc.textFile()
或其他读取方法读取驱动程序中的数据,并将其作为 RDD 或广播变量传递给工作人员(如果它足够小以适合)到每个执行者的内存中。
你能描述一下你的情况吗,我怀疑你真的需要阅读 workers 中的文件
更新:
简短摘要:
- 直接从大集群上的 worker 读取文件集可能会杀死 namenode
- 在大多数情况下,不需要直接从 worker 读取单独的文件。您可以只为
textFile()
方法通配文件集或使用 wholeTextFiles()
或 binaryFiles()
方法来读取文件集及其名称
- 在处理千兆字节图像的特定情况下,只需将它们放入序列文件并使用
sequenceFile()
方法读取它
- 使用 Python 直接从 HSFS 读取而无需额外的库可以通过直接查询 WebHDFS REST API 来实现,这是一种矫枉过正,因为这正是库所实现的。另一种选择可能是使用
pipe()
Spark 方法调用 Java 程序读取 HDFS 文件并将它们以序列化形式返回到标准输出。另一种选择是通过转义到 shell 将文件从 HDFS 复制到临时 space,然后使用标准读取文件功能读取该文件。就个人而言,我会解雇我的开发人员,因为他实施了我在此处提出的任何方法
解决方案似乎是子进程输出(没有直接 python 访问)。将已接受的答案和评论之一拼凑在一起:Python read file as stream from HDFS
cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in iter(cat.stdout.readline, ''):
print line, # include the comma
我需要从 pyspark worker 中 read/scan/write 文件 to/from hdfs。
请注意以下 api 不适用,因为它们 运行 来自 驱动程序 :
sc.textFile()
sc.saveAsParquetFile()
等等
最好不要涉及额外的第三方库(例如 pyhadoop)。
一个选择是 shell 出来,例如
os.system('hdfs dfs -ls %(hdfsPath)s' %locals())
但是有没有更原生的 pyspark 方法来实现这个?
UPDATE 这不是广播数据的情况,因为每个worker会从hdfs读取不同的数据。其中一个用例是在每个 worker 中读取一些大型二进制文件(这显然不是广播的情况)。另一种情况是读取包含指令的 "command" 文件。我已经在原生 hadoop 和 scala spark 中成功地使用了这个模式。
更原生的 PySpark 方法是使用 sc.textFile()
或其他读取方法读取驱动程序中的数据,并将其作为 RDD 或广播变量传递给工作人员(如果它足够小以适合)到每个执行者的内存中。
你能描述一下你的情况吗,我怀疑你真的需要阅读 workers 中的文件
更新:
简短摘要:
- 直接从大集群上的 worker 读取文件集可能会杀死 namenode
- 在大多数情况下,不需要直接从 worker 读取单独的文件。您可以只为
textFile()
方法通配文件集或使用wholeTextFiles()
或binaryFiles()
方法来读取文件集及其名称 - 在处理千兆字节图像的特定情况下,只需将它们放入序列文件并使用
sequenceFile()
方法读取它 - 使用 Python 直接从 HSFS 读取而无需额外的库可以通过直接查询 WebHDFS REST API 来实现,这是一种矫枉过正,因为这正是库所实现的。另一种选择可能是使用
pipe()
Spark 方法调用 Java 程序读取 HDFS 文件并将它们以序列化形式返回到标准输出。另一种选择是通过转义到 shell 将文件从 HDFS 复制到临时 space,然后使用标准读取文件功能读取该文件。就个人而言,我会解雇我的开发人员,因为他实施了我在此处提出的任何方法
解决方案似乎是子进程输出(没有直接 python 访问)。将已接受的答案和评论之一拼凑在一起:Python read file as stream from HDFS
cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in iter(cat.stdout.readline, ''):
print line, # include the comma