如何在 Spark 2.1 中保存分区的镶木地板文件?
How to save a partitioned parquet file in Spark 2.1?
我正在尝试测试如何使用 Spark 2.1 在 HDFS 2.7 中写入数据。我的数据是一个简单的虚拟值序列,输出应该按属性划分:id 和 key.
// Simple case class to cast the data
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)
// Actual data to be stored
val testData = Seq(
SimpleTest("test", 12, 13.5.toFloat, 1),
SimpleTest("test", 12, 13.5.toFloat, 2),
SimpleTest("test", 12, 13.5.toFloat, 3),
SimpleTest("simple", 12, 13.5.toFloat, 1),
SimpleTest("simple", 12, 13.5.toFloat, 2),
SimpleTest("simple", 12, 13.5.toFloat, 3)
)
// Spark's workflow to distribute, partition and store
// sc and sql are the SparkContext and SparkSession, respectively
val testDataP = sc.parallelize(testData, 6)
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
我希望在 HDFS 中得到以下树结构:
- /path/to/file
|- /id=test/key=1/part-01.parquet
|- /id=test/key=2/part-02.parquet
|- /id=test/key=3/part-03.parquet
|- /id=simple/key=1/part-04.parquet
|- /id=simple/key=2/part-05.parquet
|- /id=simple/key=3/part-06.parquet
但是当我 运行 之前的代码时,我得到以下输出:
/path/to/file/id=/key=24/
|-/part-01.parquet
|-/part-02.parquet
|-/part-03.parquet
|-/part-04.parquet
|-/part-05.parquet
|-/part-06.parquet
不知道是代码有问题,还是Spark在做别的事情
我正在执行 spark-submit
如下:
spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=uncompressed --conf spark.sql.parquet.filterPushdown=true --class myClass myFatJar.jar
有趣,因为...嗯..."it works for me".
当您在 Spark 2.1 中使用 SimpleTest
案例 class 描述您的数据集时,您 import spark.implicits._
需要输入 Dataset
.
在我的例子中,spark
是 sql
。
换句话说,您不必创建 testDataP
和 testDf
(使用 sql.createDataFrame
)。
import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
在另一个终端中(保存到 /tmp/testDf
目录后):
$ tree /tmp/testDf/
/tmp/testDf/
├── _SUCCESS
├── id=simple
│ ├── key=1
│ │ └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│ ├── key=2
│ │ └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│ └── key=3
│ └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
├── key=1
│ └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
├── key=2
│ └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── key=3
└── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
8 directories, 7 files
我找到了解决办法!根据 Cloudera 的说法,是 mapred-site.xml 配置问题(检查下面的 link)。另外,不要将数据帧写成:testDf.write.partitionBy("id", "key").parquet("/path/to/file")
我是这样做的:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")
。您可以分别用 HDFS 的主节点名称和端口替换 <namenode>
和 <port>
。
特别感谢@jacek-laskowski 的宝贵贡献。
参考文献:
https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090
Writing to HDFS in Spark/Scala
我正在尝试测试如何使用 Spark 2.1 在 HDFS 2.7 中写入数据。我的数据是一个简单的虚拟值序列,输出应该按属性划分:id 和 key.
// Simple case class to cast the data
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)
// Actual data to be stored
val testData = Seq(
SimpleTest("test", 12, 13.5.toFloat, 1),
SimpleTest("test", 12, 13.5.toFloat, 2),
SimpleTest("test", 12, 13.5.toFloat, 3),
SimpleTest("simple", 12, 13.5.toFloat, 1),
SimpleTest("simple", 12, 13.5.toFloat, 2),
SimpleTest("simple", 12, 13.5.toFloat, 3)
)
// Spark's workflow to distribute, partition and store
// sc and sql are the SparkContext and SparkSession, respectively
val testDataP = sc.parallelize(testData, 6)
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
我希望在 HDFS 中得到以下树结构:
- /path/to/file
|- /id=test/key=1/part-01.parquet
|- /id=test/key=2/part-02.parquet
|- /id=test/key=3/part-03.parquet
|- /id=simple/key=1/part-04.parquet
|- /id=simple/key=2/part-05.parquet
|- /id=simple/key=3/part-06.parquet
但是当我 运行 之前的代码时,我得到以下输出:
/path/to/file/id=/key=24/
|-/part-01.parquet
|-/part-02.parquet
|-/part-03.parquet
|-/part-04.parquet
|-/part-05.parquet
|-/part-06.parquet
不知道是代码有问题,还是Spark在做别的事情
我正在执行 spark-submit
如下:
spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=uncompressed --conf spark.sql.parquet.filterPushdown=true --class myClass myFatJar.jar
有趣,因为...嗯..."it works for me".
当您在 Spark 2.1 中使用 SimpleTest
案例 class 描述您的数据集时,您 import spark.implicits._
需要输入 Dataset
.
在我的例子中,spark
是 sql
。
换句话说,您不必创建 testDataP
和 testDf
(使用 sql.createDataFrame
)。
import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
在另一个终端中(保存到 /tmp/testDf
目录后):
$ tree /tmp/testDf/
/tmp/testDf/
├── _SUCCESS
├── id=simple
│ ├── key=1
│ │ └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│ ├── key=2
│ │ └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│ └── key=3
│ └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
├── key=1
│ └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
├── key=2
│ └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── key=3
└── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
8 directories, 7 files
我找到了解决办法!根据 Cloudera 的说法,是 mapred-site.xml 配置问题(检查下面的 link)。另外,不要将数据帧写成:testDf.write.partitionBy("id", "key").parquet("/path/to/file")
我是这样做的:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")
。您可以分别用 HDFS 的主节点名称和端口替换 <namenode>
和 <port>
。
特别感谢@jacek-laskowski 的宝贵贡献。
参考文献:
https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090
Writing to HDFS in Spark/Scala