在 Spark 中读取 Avro 文件并提取列值
Reading Avro file in Spark and extracting column values
我想使用 Spark 读取 avro 文件(我使用的是 Spark 1.3.0,所以我没有数据帧)
我用这段代码读取了avro文件
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.spark.SparkContext
private def readAvro(sparkContext: SparkContext, path: String) = {
sparkContext.newAPIHadoopFile[
AvroKey[GenericRecord],
NullWritable,
AvroKeyInputFormat[GenericRecord]
](path)
}
我执行这个并得到一个 RDD。现在从 RDD,我如何提取特定列的值?喜欢遍历所有记录并给出列名的值?
[edit]按照下面贾斯汀的建议,我试过了
val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](input)
rdd.map(record=> record._1.get("accountId")).toArray().foreach(println)
但是我得到一个错误
<console>:34: error: value get is not a member of org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]
rdd.map(record=> record._1.get("accountId")).toArray().foreach(println)
AvroKey
有一个 datum
方法来提取包装值。 GenericRecord
有一个 get
方法接受列名作为字符串。所以你可以使用 map
提取列
rdd.map(record=>record._1.datum.get("COLNAME"))
我想使用 Spark 读取 avro 文件(我使用的是 Spark 1.3.0,所以我没有数据帧)
我用这段代码读取了avro文件
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.spark.SparkContext
private def readAvro(sparkContext: SparkContext, path: String) = {
sparkContext.newAPIHadoopFile[
AvroKey[GenericRecord],
NullWritable,
AvroKeyInputFormat[GenericRecord]
](path)
}
我执行这个并得到一个 RDD。现在从 RDD,我如何提取特定列的值?喜欢遍历所有记录并给出列名的值?
[edit]按照下面贾斯汀的建议,我试过了
val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](input)
rdd.map(record=> record._1.get("accountId")).toArray().foreach(println)
但是我得到一个错误
<console>:34: error: value get is not a member of org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]
rdd.map(record=> record._1.get("accountId")).toArray().foreach(println)
AvroKey
有一个 datum
方法来提取包装值。 GenericRecord
有一个 get
方法接受列名作为字符串。所以你可以使用 map
rdd.map(record=>record._1.datum.get("COLNAME"))