点燃拼花地板
Spark avro to parquet
我有一个 avro 格式的数据流(json 编码)需要存储为 parquet 文件。我只能这样做,
val df = sqc.read.json(jsonRDD).toDF()
并将 df 写为 parquet。
这里的架构是从 json 中推断出来的。但是我已经有了 avsc 文件,我不希望 spark 从 json.
推断模式
并且在上述方式中,parquet 文件将架构信息存储为 StructType 而不是 avro.record.type。有没有办法存储 avro 模式信息。
火花 - 1.4.1
您可以以编程方式指定架构
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
请看:http://spark.apache.org/docs/latest/sql-programming-guide.html
spark-avro 然后使用模式类型指定 avro 类型,如下所示
- Spark SQL 类型 -> Avro 类型
- 字节类型 -> 整数
- 短类型 -> 整数
- 小数类型 -> 字符串
- 二进制类型 -> 字节
- 时间戳类型 -> 长
- 结构类型 -> 记录
你可以这样写Avro记录:
import com.databricks.spark.avro._
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = Seq((2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
.toDF("year", "month", "title", "rating")
df.write.partitionBy("year", "month").avro("/tmp/output")
最终使用了这个问题的答案avro-schema-to-spark-structtype
def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = {
val dummyFIle = File.createTempFile("avro_dummy", "avro")
val datumWriter = new GenericDatumWriter[wuser]()
datumWriter.setSchema(avroSchema)
val writer = new DataFileWriter(datumWriter).create(avroSchema, dummyFIle)
writer.flush()
writer.close()
val df = sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath)
df.schema
}
我有一个 avro 格式的数据流(json 编码)需要存储为 parquet 文件。我只能这样做,
val df = sqc.read.json(jsonRDD).toDF()
并将 df 写为 parquet。
这里的架构是从 json 中推断出来的。但是我已经有了 avsc 文件,我不希望 spark 从 json.
推断模式并且在上述方式中,parquet 文件将架构信息存储为 StructType 而不是 avro.record.type。有没有办法存储 avro 模式信息。
火花 - 1.4.1
您可以以编程方式指定架构
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
请看:http://spark.apache.org/docs/latest/sql-programming-guide.html
spark-avro 然后使用模式类型指定 avro 类型,如下所示
- Spark SQL 类型 -> Avro 类型
- 字节类型 -> 整数
- 短类型 -> 整数
- 小数类型 -> 字符串
- 二进制类型 -> 字节
- 时间戳类型 -> 长
- 结构类型 -> 记录
你可以这样写Avro记录:
import com.databricks.spark.avro._
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = Seq((2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
.toDF("year", "month", "title", "rating")
df.write.partitionBy("year", "month").avro("/tmp/output")
最终使用了这个问题的答案avro-schema-to-spark-structtype
def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = {
val dummyFIle = File.createTempFile("avro_dummy", "avro")
val datumWriter = new GenericDatumWriter[wuser]()
datumWriter.setSchema(avroSchema)
val writer = new DataFileWriter(datumWriter).create(avroSchema, dummyFIle)
writer.flush()
writer.close()
val df = sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath)
df.schema
}