Spark Scala 从 rdd.foreachPartition 取回数据
Spark Scala Get Data Back from rdd.foreachPartition
我有一些这样的代码:
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
val lastRevs = distinctFileGidsRDD.
foreachPartition(iter => {
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
while(iter.hasNext) {
val item = iter.next()
//println(item(0))
println("String: "+item(0).toString())
val jsonStr = DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
map { resultSet => resultSet.string(1) }.single.apply()
}
println("\nJSON: "+jsonStr)
}
})
println("\nEND Last Revs Class: "+ lastRevs.getClass)
代码输出(经过大量编辑)类似于:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void
问题 1:
我怎样才能使 lastRevs 值采用有用的格式,如 JSON string/null 或像 Some / None?
这样的选项
问题 2:
我的偏好:是否有另一种方法获取类似 RDD 格式(而不是迭代器格式)的分区数据?
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
来自 http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
问题 3:我使用的获取数据的方法是否合理(假设我遵循上面的 link)? (撇开目前这是一个 scalikejdbc 系统的事实 JDBC。这将是一个键值存储,除此原型之外的某种类型。)
要创建使用执行程序本地资源(例如数据库或网络连接)的转换,您应该使用 rdd.mapPartitions
。它允许在执行器本地初始化一些代码,并使用这些本地资源来处理分区中的数据。
代码应如下所示:
val lastRevs = distinctFileGidsRDD.
mapPartitions{iter =>
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.map{ element =>
DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
.map { resultSet => resultSet.string(1) }.single.apply()
}
}
}
我有一些这样的代码:
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
val lastRevs = distinctFileGidsRDD.
foreachPartition(iter => {
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
while(iter.hasNext) {
val item = iter.next()
//println(item(0))
println("String: "+item(0).toString())
val jsonStr = DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
map { resultSet => resultSet.string(1) }.single.apply()
}
println("\nJSON: "+jsonStr)
}
})
println("\nEND Last Revs Class: "+ lastRevs.getClass)
代码输出(经过大量编辑)类似于:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void
问题 1: 我怎样才能使 lastRevs 值采用有用的格式,如 JSON string/null 或像 Some / None?
这样的选项问题 2: 我的偏好:是否有另一种方法获取类似 RDD 格式(而不是迭代器格式)的分区数据?
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
来自 http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
问题 3:我使用的获取数据的方法是否合理(假设我遵循上面的 link)? (撇开目前这是一个 scalikejdbc 系统的事实 JDBC。这将是一个键值存储,除此原型之外的某种类型。)
要创建使用执行程序本地资源(例如数据库或网络连接)的转换,您应该使用 rdd.mapPartitions
。它允许在执行器本地初始化一些代码,并使用这些本地资源来处理分区中的数据。
代码应如下所示:
val lastRevs = distinctFileGidsRDD.
mapPartitions{iter =>
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.map{ element =>
DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
.map { resultSet => resultSet.string(1) }.single.apply()
}
}
}