从 GCS 读取 numpy 数组到 spark
reading numy array from GCS into spark
我在 google 存储中有 100 个包含 numpy 数组的 npz 文件。
我已经用 jupyter 设置了 dataproc,我正在尝试将所有 numpy 数组读入 spark RDD。将 numpy 数组从 google 存储加载到 pyspark 的最佳方法是什么?
有没有像 np.load("gs://path/to/array.npz")
这样的简单方法来加载 numpy 数组,然后对其执行 sc.parallelize
?
如果您最终计划扩展,您将希望使用 SparkContext
中的分布式输入方法,而不是依赖 sc.parallelize
从驱动程序加载任何本地文件。听起来你需要完整地阅读每个文件,所以在你的情况下你想要:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/")
或者您也可以根据需要指定单个文件,但那样您只有一个包含单个元素的 RDD:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
那么每条记录就是一对<filename>,<str of bytes>
。在 Dataproc 上,sc.binaryFiles
将自动直接使用 GCS 路径,这与需要本地文件系统路径的 np.load
不同。
然后在你的工作代码中,你只需要使用StringIO
将那些字节字符串用作你放入np.load
的文件对象:
from StringIO import StringIO
# For example, to create an RDD of the 'arr_0' element of each of the picked objects:
npz_rdd.map(lambda l: numpy.load(StringIO(l[1]))['arr_0'])
在开发过程中,如果你真的只想将文件读入你的主驱动程序,你总是可以使用 collect()
折叠你的 RDD 以在本地检索它:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
local_bytes = npz_rdd.collect()[0][1]
local_np_obj = np.load(StringIO(local_bytes))
我在 google 存储中有 100 个包含 numpy 数组的 npz 文件。
我已经用 jupyter 设置了 dataproc,我正在尝试将所有 numpy 数组读入 spark RDD。将 numpy 数组从 google 存储加载到 pyspark 的最佳方法是什么?
有没有像 np.load("gs://path/to/array.npz")
这样的简单方法来加载 numpy 数组,然后对其执行 sc.parallelize
?
如果您最终计划扩展,您将希望使用 SparkContext
中的分布式输入方法,而不是依赖 sc.parallelize
从驱动程序加载任何本地文件。听起来你需要完整地阅读每个文件,所以在你的情况下你想要:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/")
或者您也可以根据需要指定单个文件,但那样您只有一个包含单个元素的 RDD:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
那么每条记录就是一对<filename>,<str of bytes>
。在 Dataproc 上,sc.binaryFiles
将自动直接使用 GCS 路径,这与需要本地文件系统路径的 np.load
不同。
然后在你的工作代码中,你只需要使用StringIO
将那些字节字符串用作你放入np.load
的文件对象:
from StringIO import StringIO
# For example, to create an RDD of the 'arr_0' element of each of the picked objects:
npz_rdd.map(lambda l: numpy.load(StringIO(l[1]))['arr_0'])
在开发过程中,如果你真的只想将文件读入你的主驱动程序,你总是可以使用 collect()
折叠你的 RDD 以在本地检索它:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
local_bytes = npz_rdd.collect()[0][1]
local_np_obj = np.load(StringIO(local_bytes))