Scala mapPartition 在分区上收集什么都不做
Scala mapPartition collect on partition do nothing
我正在尝试将数据从 rdd 移动到 postgres table,使用:
def copyIn(reader: java.io.Reader, columnStmt: String = "") = {
//connect to postgres database on the localhost
val driver = "org.postgresql.Driver"
var connection:Connection = null
Class.forName(driver)
connection = DriverManager.getConnection()
try {
connection.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY my_table ($columnStmt) FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => println(se.getMessage)
case t: Throwable => println(t.getMessage)
} finally {
connection.close()
}
}
myRdd.mapPartitions(iter => {
val sb = new StringBuilder()
var n_iter = iter.map(row => {
val mapRequest = Utils.getMyRowMap(myMap, row)
sb.append(mapRequest.values.mkString(", ")).append("\n")
})
copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
sb.clear
n_iter
}).collect
脚本不断进入 CopyIn 函数,没有数据可插入。我想这可能是因为 iter.map 只是映射分区而不执行收集?我尝试收集整个 myRdd 对象,但仍然没有在 copyIn 函数中获取数据。
我如何遍历 rdd 并附加 StringBuilder 以及为什么上面的代码片段不起作用?
有人知道吗?
iter
是一个 Iterator
。所以 iter.map
创建了一个新的 Iterator
,但你实际上并没有迭代它,它什么也不做。您可能需要 foreach
来代替。除非 iter
在你 return 时为空,collect
的结果将是一个空的 RDD。
你想要的实际方法是foreachPartition
:
myRdd.foreachPartition(iter => {
val sb = new StringBuilder()
iter.foreach(row => {
val mapRequest = Utils.getMyRowMap(myMap, row)
sb.append(mapRequest.values.mkString(", ")).append("\n")
})
copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
sb.clear
})
然后myRdd.collect
如果你也想收集它。 (Persist myRdd
如果你想使用它两次而不重新计算它。)
我正在尝试将数据从 rdd 移动到 postgres table,使用:
def copyIn(reader: java.io.Reader, columnStmt: String = "") = {
//connect to postgres database on the localhost
val driver = "org.postgresql.Driver"
var connection:Connection = null
Class.forName(driver)
connection = DriverManager.getConnection()
try {
connection.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY my_table ($columnStmt) FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => println(se.getMessage)
case t: Throwable => println(t.getMessage)
} finally {
connection.close()
}
}
myRdd.mapPartitions(iter => {
val sb = new StringBuilder()
var n_iter = iter.map(row => {
val mapRequest = Utils.getMyRowMap(myMap, row)
sb.append(mapRequest.values.mkString(", ")).append("\n")
})
copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
sb.clear
n_iter
}).collect
脚本不断进入 CopyIn 函数,没有数据可插入。我想这可能是因为 iter.map 只是映射分区而不执行收集?我尝试收集整个 myRdd 对象,但仍然没有在 copyIn 函数中获取数据。
我如何遍历 rdd 并附加 StringBuilder 以及为什么上面的代码片段不起作用? 有人知道吗?
iter
是一个 Iterator
。所以 iter.map
创建了一个新的 Iterator
,但你实际上并没有迭代它,它什么也不做。您可能需要 foreach
来代替。除非 iter
在你 return 时为空,collect
的结果将是一个空的 RDD。
你想要的实际方法是foreachPartition
:
myRdd.foreachPartition(iter => {
val sb = new StringBuilder()
iter.foreach(row => {
val mapRequest = Utils.getMyRowMap(myMap, row)
sb.append(mapRequest.values.mkString(", ")).append("\n")
})
copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
sb.clear
})
然后myRdd.collect
如果你也想收集它。 (Persist myRdd
如果你想使用它两次而不重新计算它。)