pyspark如何加载压缩的snappy文件
pyspark how to load compressed snappy file
我使用 python-snappy 压缩了一个文件并将其放入我的 hdfs 存储区。我现在正尝试像这样阅读它,但我得到以下回溯。我找不到如何读取文件的示例,因此我无法处理它。我可以很好地阅读文本文件(未压缩)版本。我应该使用 sc.sequenceFile 吗?谢谢!
I first compressed the file and pushed it to hdfs
python-snappy -m snappy -c gene_regions.vcf gene_regions.vcf.snappy
hdfs dfs -put gene_regions.vcf.snappy /
I then added the following to spark-env.sh
export SPARK_EXECUTOR_MEMORY=16G
export HADOOP_HOME=/usr/local/hadoop
export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.1.1.8-SNAPSHOT.jar
I then launch my spark master and slave and finally my ipython notebook where I am executing the code below.
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()
ValueError Traceback(最后一次调用)
在 ()
----> 1 a_file.first()
/home/user/Software/spark-1.3.0-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self)
1244 如果 rs:
第1245章return[0]
-> 1246 引发 ValueError("RDD is empty")
1247
1248 def isEmpty(自身):
ValueError: RDD 为空
Working code (uncompressed) text file
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf")
a_file.first()
输出:
u'##fileformat=VCFv4.1'
这里的问题是 python-snappy 与 Hadoop 的 snappy 编解码器不兼容,Spark 在看到“.snappy”后缀时将使用它来读取数据。它们基于相同的底层算法,但它们不兼容,因为您可以用一个压缩并用另一个解压缩。
您可以通过首先使用 Spark 或 Hadoop 将数据写入 snappy 来完成这项工作。或者让 Spark 将您的数据读取为二进制 blob,然后您自己手动调用 python-snappy 解压缩(参见此处的 binaryFiles http://spark.apache.org/docs/latest/api/python/pyspark.html)。二进制 blob 方法有点脆弱,因为它需要为每个输入文件将整个文件放入内存。但是,如果您的数据足够小,那也行得通。
好的,我找到了解决办法!
建立这个...
https://github.com/liancheng/snappy-utils
在 ubuntu 14.10 上,我必须安装 gcc-4.4 才能构建它,评论我在这里看到的错误
https://code.google.com/p/hadoop-snappy/issues/detail?id=9
我现在可以像这样在命令行中使用 snappy 压缩文本文件
snappy -c gene_regions.vcf -o gene_regions.vcf.snappy
将其转储到 hdfs
hdfs dfs -put gene_regions.vcf.snappy
然后加载到pyspark中!
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()
瞧! vcf的header...
u'##fileformat=VCFv4.1'
不确定我的文件有哪个 snappy
编解码器,但 spark.read.text
对我来说没有任何问题。
已接受的答案现已过时。你可以使用 python-snappy 来压缩 hadoop-snappy,但是几乎没有文档。
示例:
import snappy
with open('test.json.snappy', 'wb') as out_file:
data=json.dumps({'test':'somevalue','test2':'somevalue2'}).encode('utf-8')
compressor = snappy.hadoop_snappy.StreamCompressor()
compressed = compressor.compress(data)
out_file.write(compressed)
您也可以使用命令行,其中的选项更直接一些,使用 -t hadoop_snappy 标志。示例:
echo "{'test':'somevalue','test2':'somevalue2'}" | python -m snappy -t hadoop_snappy -c - test.json.snappy
我使用 python-snappy 压缩了一个文件并将其放入我的 hdfs 存储区。我现在正尝试像这样阅读它,但我得到以下回溯。我找不到如何读取文件的示例,因此我无法处理它。我可以很好地阅读文本文件(未压缩)版本。我应该使用 sc.sequenceFile 吗?谢谢!
I first compressed the file and pushed it to hdfs
python-snappy -m snappy -c gene_regions.vcf gene_regions.vcf.snappy
hdfs dfs -put gene_regions.vcf.snappy /
I then added the following to spark-env.sh
export SPARK_EXECUTOR_MEMORY=16G
export HADOOP_HOME=/usr/local/hadoop
export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.1.1.8-SNAPSHOT.jar
I then launch my spark master and slave and finally my ipython notebook where I am executing the code below.
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()
ValueError Traceback(最后一次调用) 在 () ----> 1 a_file.first()
/home/user/Software/spark-1.3.0-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self) 1244 如果 rs: 第1245章return[0] -> 1246 引发 ValueError("RDD is empty") 1247 1248 def isEmpty(自身):
ValueError: RDD 为空
Working code (uncompressed) text file
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf")
a_file.first()
输出: u'##fileformat=VCFv4.1'
这里的问题是 python-snappy 与 Hadoop 的 snappy 编解码器不兼容,Spark 在看到“.snappy”后缀时将使用它来读取数据。它们基于相同的底层算法,但它们不兼容,因为您可以用一个压缩并用另一个解压缩。
您可以通过首先使用 Spark 或 Hadoop 将数据写入 snappy 来完成这项工作。或者让 Spark 将您的数据读取为二进制 blob,然后您自己手动调用 python-snappy 解压缩(参见此处的 binaryFiles http://spark.apache.org/docs/latest/api/python/pyspark.html)。二进制 blob 方法有点脆弱,因为它需要为每个输入文件将整个文件放入内存。但是,如果您的数据足够小,那也行得通。
好的,我找到了解决办法!
建立这个... https://github.com/liancheng/snappy-utils 在 ubuntu 14.10 上,我必须安装 gcc-4.4 才能构建它,评论我在这里看到的错误 https://code.google.com/p/hadoop-snappy/issues/detail?id=9
我现在可以像这样在命令行中使用 snappy 压缩文本文件
snappy -c gene_regions.vcf -o gene_regions.vcf.snappy
将其转储到 hdfs
hdfs dfs -put gene_regions.vcf.snappy
然后加载到pyspark中!
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()
瞧! vcf的header...
u'##fileformat=VCFv4.1'
不确定我的文件有哪个 snappy
编解码器,但 spark.read.text
对我来说没有任何问题。
已接受的答案现已过时。你可以使用 python-snappy 来压缩 hadoop-snappy,但是几乎没有文档。 示例:
import snappy
with open('test.json.snappy', 'wb') as out_file:
data=json.dumps({'test':'somevalue','test2':'somevalue2'}).encode('utf-8')
compressor = snappy.hadoop_snappy.StreamCompressor()
compressed = compressor.compress(data)
out_file.write(compressed)
您也可以使用命令行,其中的选项更直接一些,使用 -t hadoop_snappy 标志。示例:
echo "{'test':'somevalue','test2':'somevalue2'}" | python -m snappy -t hadoop_snappy -c - test.json.snappy