使用模式将带有 Spark 的 AVRO 消息转换为 DataFrame
Use schema to convert AVRO messages with Spark to DataFrame
有没有办法使用模式来转换avro messages from kafka with spark to dataframe?用户记录的架构文件:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
以及 SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages 中的代码片段以读入消息。
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
除了使用 case class 将 AVRO 消息转换为 DataFrame 之外,我找不到其他方法。是否有可能改用模式?我正在使用 Spark 1.6.2
和 Kafka 0.10
.
完整代码,以备不时之需。
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
我处理过类似的问题,但在 Java 中。所以对 Scala 不太确定,但请看一下库 com.databricks.spark.avro。
所以
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF()
你可以试试这个
val df = spark.read.avro(message._2.get)
OP 可能解决了这个问题,但为了将来参考,我很普遍地解决了这个问题,所以认为它可能对 post 这里有帮助。
所以一般来说,您应该将 Avro 模式转换为 spark StructType,并将 RDD 中的对象转换为 Row[Any],然后使用:
spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>
为了转换 Avro 模式,我使用了 spark-avro,如下所示:
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
RDD 的转换更棘手。如果你的模式很简单,你可能只做一个简单的映射。像这样:
rdd.map(obj=>{
val seq = (obj.getName(),obj.getAge()
Row.fromSeq(seq))
})
在此示例中,对象有 2 个字段名称和年龄。
重要的是确保 Row 中的元素与之前 StructType 中字段的顺序和类型相匹配。
在我的特殊情况下,我有一个复杂得多的对象,我想对其进行通用处理以支持未来的架构更改,因此我的代码要复杂得多。
OP 建议的方法也适用于某些 casese,但很难暗示复杂对象(不是原始或 case-class)
另一个提示是,如果您在 class 中有一个 class,您应该将 class 转换为一行,以便将环绕 class 转换为类似于:
Row(Any,Any,Any,Row,...)
你也可以看看我之前提到的关于如何将对象转换为行的 spark-avro 项目。我自己在那里使用了一些逻辑
如果阅读本文的人需要进一步帮助,请在评论中询问我,我会尽力提供帮助
同样的问题也解决了。
对于任何有兴趣以无需停止和重新部署您的 spark 应用程序(假设您的应用程序逻辑可以处理)的方式处理模式更改的人,请参阅此 question/answer。
有没有办法使用模式来转换avro messages from kafka with spark to dataframe?用户记录的架构文件:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
以及 SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages 中的代码片段以读入消息。
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
除了使用 case class 将 AVRO 消息转换为 DataFrame 之外,我找不到其他方法。是否有可能改用模式?我正在使用 Spark 1.6.2
和 Kafka 0.10
.
完整代码,以备不时之需。
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
我处理过类似的问题,但在 Java 中。所以对 Scala 不太确定,但请看一下库 com.databricks.spark.avro。
所以
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF()
你可以试试这个
val df = spark.read.avro(message._2.get)
OP 可能解决了这个问题,但为了将来参考,我很普遍地解决了这个问题,所以认为它可能对 post 这里有帮助。
所以一般来说,您应该将 Avro 模式转换为 spark StructType,并将 RDD 中的对象转换为 Row[Any],然后使用:
spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>
为了转换 Avro 模式,我使用了 spark-avro,如下所示:
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
RDD 的转换更棘手。如果你的模式很简单,你可能只做一个简单的映射。像这样:
rdd.map(obj=>{
val seq = (obj.getName(),obj.getAge()
Row.fromSeq(seq))
})
在此示例中,对象有 2 个字段名称和年龄。
重要的是确保 Row 中的元素与之前 StructType 中字段的顺序和类型相匹配。
在我的特殊情况下,我有一个复杂得多的对象,我想对其进行通用处理以支持未来的架构更改,因此我的代码要复杂得多。
OP 建议的方法也适用于某些 casese,但很难暗示复杂对象(不是原始或 case-class)
另一个提示是,如果您在 class 中有一个 class,您应该将 class 转换为一行,以便将环绕 class 转换为类似于:
Row(Any,Any,Any,Row,...)
你也可以看看我之前提到的关于如何将对象转换为行的 spark-avro 项目。我自己在那里使用了一些逻辑
如果阅读本文的人需要进一步帮助,请在评论中询问我,我会尽力提供帮助
同样的问题也解决了
对于任何有兴趣以无需停止和重新部署您的 spark 应用程序(假设您的应用程序逻辑可以处理)的方式处理模式更改的人,请参阅此 question/answer。