Spark Scala DataFrame 单行转换为 JSON 用于 PostgreSQL 插入
Spark Scala DataFrame Single Row conversion to JSON for PostrgeSQL Insertion
使用名为 lastTail
的 DataFrame,我可以这样迭代:
import scalikejdbc._
// ...
// Do Kafka Streaming to create DataFrame lastTail
// ...
lastTail.printSchema
lastTail.foreachPartition(iter => {
// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL)
while(iter.hasNext) {
val item = iter.next()
println("****")
println(item.getClass)
println(item.getAs("fileGid"))
println("Schema: "+item.schema)
println("String: "+item.toString())
println("Seqnce: "+item.toSeq)
// convert this item into an XXX format (like JSON)
// write row to DB in the selected format
}
})
这会输出 "something like"(经过修订):
root
|-- fileGid: string (nullable = true)
|-- eventStruct: struct (nullable = false)
| |-- eventIndex: integer (nullable = true)
| |-- eventGid: string (nullable = true)
| |-- eventType: string (nullable = true)
|-- revisionStruct: struct (nullable = false)
| |-- eventIndex: integer (nullable = true)
| |-- eventGid: string (nullable = true)
| |-- eventType: string (nullable = true)
和(只有一个迭代项 - 已编辑,但希望语法也足够好)
****
class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
12345
Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false))
String: [12345,[1,4,edit],[1,4,revision]]
Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])
注意:我在 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala, but with DataFrames instead. I am also following "Design Patterns for using foreachRDD" seen at http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning 上做 val metric = iter.sum
的部分。
我如何转换它
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
(参见 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala)
将迭代项转换为易于写入(JSON 或 ...?- 我是开放的)到 PostgreSQL 中的东西。 (如果不是 JSON,请建议如何将此值读回 DataFrame 以供在另一点使用。)
嗯,我想出了一个不同的方法来解决这个问题。
val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString)
val ltv = lastTail.toJSON
val kvPair = ltk.zip(ltv)
然后我将简单地遍历 RDD 而不是 DataFrame。
kvPair.foreachPartition(iter => {
while(iter.hasNext) {
val item = iter.next()
println(item.getClass)
println(item)
}
})
撇开数据不谈,我得到 class scala.Tuple2
这使得在 JDBC / PostgreSQL 中存储 KV 对变得更简单。
我确信还有其他方法不是解决方法。
使用名为 lastTail
的 DataFrame,我可以这样迭代:
import scalikejdbc._
// ...
// Do Kafka Streaming to create DataFrame lastTail
// ...
lastTail.printSchema
lastTail.foreachPartition(iter => {
// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL)
while(iter.hasNext) {
val item = iter.next()
println("****")
println(item.getClass)
println(item.getAs("fileGid"))
println("Schema: "+item.schema)
println("String: "+item.toString())
println("Seqnce: "+item.toSeq)
// convert this item into an XXX format (like JSON)
// write row to DB in the selected format
}
})
这会输出 "something like"(经过修订):
root
|-- fileGid: string (nullable = true)
|-- eventStruct: struct (nullable = false)
| |-- eventIndex: integer (nullable = true)
| |-- eventGid: string (nullable = true)
| |-- eventType: string (nullable = true)
|-- revisionStruct: struct (nullable = false)
| |-- eventIndex: integer (nullable = true)
| |-- eventGid: string (nullable = true)
| |-- eventType: string (nullable = true)
和(只有一个迭代项 - 已编辑,但希望语法也足够好)
****
class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
12345
Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false))
String: [12345,[1,4,edit],[1,4,revision]]
Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])
注意:我在 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala, but with DataFrames instead. I am also following "Design Patterns for using foreachRDD" seen at http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning 上做 val metric = iter.sum
的部分。
我如何转换它 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (参见 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) 将迭代项转换为易于写入(JSON 或 ...?- 我是开放的)到 PostgreSQL 中的东西。 (如果不是 JSON,请建议如何将此值读回 DataFrame 以供在另一点使用。)
嗯,我想出了一个不同的方法来解决这个问题。
val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString)
val ltv = lastTail.toJSON
val kvPair = ltk.zip(ltv)
然后我将简单地遍历 RDD 而不是 DataFrame。
kvPair.foreachPartition(iter => {
while(iter.hasNext) {
val item = iter.next()
println(item.getClass)
println(item)
}
})
撇开数据不谈,我得到 class scala.Tuple2
这使得在 JDBC / PostgreSQL 中存储 KV 对变得更简单。
我确信还有其他方法不是解决方法。