如何从空 RDD 读取 Avro 模式?
How to read Avro schema from empty RDD?
我正在使用 AvroKeyInputFormat
读取 avro 文件:
val records = sc.newAPIHadoopFile[AvroKey[T], NullWritable, AvroKeyInputFormat[T]](path)
.map(_._1.datum())
因为我需要在工作中反思模式,所以我得到了这样的 Avro 模式:
val schema = records.first.getSchema
不幸的是,如果 path
中的 avro 文件是空的(它们包括编写器模式,但没有记录),这将失败。
有没有简单的方法即使没有记录也只用 Spark 加载 avro 模式?
我找到了一个解决方案(灵感来自 com.databricks.spark.avro.DefaultSource
):
/**
* Loads a schema from avro files in `directory`. This method also works if none
* of the avro files contain any records.
*/
def schema(directory: String)(implicit sc: SparkContext): Schema = {
val fs = FileSystem.get(new URI(directory), sc.hadoopConfiguration)
val it = fs.listFiles(new Path(directory), false)
var avroFile: Option[FileStatus] = None
while (it.hasNext && avroFile.isEmpty) {
val fileStatus = it.next()
if (fileStatus.isFile && fileStatus.getPath.getName.endsWith(".avro")) {
avroFile = Some(fileStatus)
}
}
avroFile.fold {
throw new Exception(s"No avro files found in $directory")
} { file =>
val in = new FsInput(file.getPath, sc.hadoopConfiguration)
try {
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
try {
reader.getSchema
} finally {
reader.close()
}
} finally {
in.close()
}
}
}
我正在使用 AvroKeyInputFormat
读取 avro 文件:
val records = sc.newAPIHadoopFile[AvroKey[T], NullWritable, AvroKeyInputFormat[T]](path)
.map(_._1.datum())
因为我需要在工作中反思模式,所以我得到了这样的 Avro 模式:
val schema = records.first.getSchema
不幸的是,如果 path
中的 avro 文件是空的(它们包括编写器模式,但没有记录),这将失败。
有没有简单的方法即使没有记录也只用 Spark 加载 avro 模式?
我找到了一个解决方案(灵感来自 com.databricks.spark.avro.DefaultSource
):
/**
* Loads a schema from avro files in `directory`. This method also works if none
* of the avro files contain any records.
*/
def schema(directory: String)(implicit sc: SparkContext): Schema = {
val fs = FileSystem.get(new URI(directory), sc.hadoopConfiguration)
val it = fs.listFiles(new Path(directory), false)
var avroFile: Option[FileStatus] = None
while (it.hasNext && avroFile.isEmpty) {
val fileStatus = it.next()
if (fileStatus.isFile && fileStatus.getPath.getName.endsWith(".avro")) {
avroFile = Some(fileStatus)
}
}
avroFile.fold {
throw new Exception(s"No avro files found in $directory")
} { file =>
val in = new FsInput(file.getPath, sc.hadoopConfiguration)
try {
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
try {
reader.getSchema
} finally {
reader.close()
}
} finally {
in.close()
}
}
}