如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

How to access Amazon kinesis streams files in S3 in my pyspark code?

我在 S3 上获得了一个存储桶,其中包含使用 'snappy' 压缩格式的 Kinesis 流文件。文件夹结构和文件格式为 s3://<mybucket>/yyyy/mm/dd/*.snappy.

我正在尝试将其读入 pyspark 中的 sqlContext。通常,我会将存储桶指定为:

df = sqlContext.read.json('s3://<mybucket>/inputfile.json')

如何将所有这些多部分压缩文件放入我的数据框中?

更新:似乎我使用相同的构造取得了更多进展。但是,运行进入堆大小问题:

#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p
kill -9 %p"
#   Executing /bin/sh -c "kill 6128
kill -9 6128"...

数据量并没有那么大,但这个解压步骤似乎让事情变得更糟了。

如果您想获取所有天/月/年的所有活泼文件,请尝试这样的操作:

s3://<mybucket>/*/*/*/*.snappy

前三个 * 指的是 /yyyy/mm/dd/ 子文件夹。

为证明此功能有效,您可以执行以下测试:

创建了一个 testDirectories 文件夹...并在其中嵌套了一些日期文件夹。

nestedDirectories/
-- 2016/
-- -- 12/
-- -- -- 15/
-- -- -- -- data.txt

并在 data.txt 内:

hello
world
I
Have
Some
Words

然后我 运行 pyspark:

>>> rdd = sc.readText("/path/to/nestedDirectories/*/*/*/*.txt")
>>> rdd.count()
6

所以,星型模式适用于将文件导入 RDD。

因此,从这里开始,如果您遇到内存和东西方面的​​问题,可能是因为您的文件太多而文件大小太小。这被称为 "small files problem" https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html