如何使用 Scala 以优雅的方式处理 Spark 中的 Avro
How to handle Avro in Spark with Scala in elegant way
我正在 Scala 中从事从 avro 文件读取数据的 spark 作业。
开始很简单:
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但后来就不优雅了,因为我需要对元组进行操作,即。
avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])).
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))).
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)).
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(","))
...
我正在考虑使用 Map 而不是元组,但如果我有几种不同的类型,即 Long 和 String,它将导致 Map[String,Any]
并在每个操作上进行强制转换。
即
avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])).
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
...
另一种解决方案是使用 case 类 并将值包装到其中,但有时它可能会导致大量 case 类 定义,即:
case class TestClass(value: Long, level:Double, size:Long, category:String)
avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
map(x => (asDate(x._1),x._2)).
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)).
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(","))
...
我想知道在这种情况下是否有更好的方法来处理通用记录 - 您不需要不断地转换为特定类型并且可以对字段名称进行操作的方法。命名元组之类的东西可以完成这项工作。
你知道更好的方法吗?
你们是怎么处理这种情况的?
有模式匹配:
map { case (value, startTime, level, size, category) =>
(asDate(startTime), (value,level,size,category))
}.reduceByKey { case ((value1, level1, size1, category1), (value2, level2, size2, category2)) =>
(value1+value2, level1+level2, size1+size2, category2)
}.map { case (startTime, (value, level, size, category)) =>
List(startTime, value, level, size, category).mkString(","))
}
如果您有一些经常重复使用的元组,请为它们使用 case 类。
我正在 Scala 中从事从 avro 文件读取数据的 spark 作业。 开始很简单:
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但后来就不优雅了,因为我需要对元组进行操作,即。
avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])).
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))).
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)).
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(","))
...
我正在考虑使用 Map 而不是元组,但如果我有几种不同的类型,即 Long 和 String,它将导致 Map[String,Any]
并在每个操作上进行强制转换。
即
avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])).
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
...
另一种解决方案是使用 case 类 并将值包装到其中,但有时它可能会导致大量 case 类 定义,即:
case class TestClass(value: Long, level:Double, size:Long, category:String)
avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
map(x => (asDate(x._1),x._2)).
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)).
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(","))
...
我想知道在这种情况下是否有更好的方法来处理通用记录 - 您不需要不断地转换为特定类型并且可以对字段名称进行操作的方法。命名元组之类的东西可以完成这项工作。
你知道更好的方法吗?
你们是怎么处理这种情况的?
有模式匹配:
map { case (value, startTime, level, size, category) =>
(asDate(startTime), (value,level,size,category))
}.reduceByKey { case ((value1, level1, size1, category1), (value2, level2, size2, category2)) =>
(value1+value2, level1+level2, size1+size2, category2)
}.map { case (startTime, (value, level, size, category)) =>
List(startTime, value, level, size, category).mkString(","))
}
如果您有一些经常重复使用的元组,请为它们使用 case 类。