Scala mapPartition 在分区上收集什么都不做

Scala mapPartition collect on partition do nothing

我正在尝试将数据从 rdd 移动到 postgres table,使用:

def copyIn(reader: java.io.Reader, columnStmt: String = "") = {
        //connect to postgres database on the localhost
        val driver = "org.postgresql.Driver"
        var connection:Connection = null
        Class.forName(driver)
        connection = DriverManager.getConnection()

        try {
            connection.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY my_table ($columnStmt) FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => println(se.getMessage)
            case t: Throwable => println(t.getMessage)
        } finally {
            connection.close()
        }
    }

myRdd.mapPartitions(iter => {
        val sb = new StringBuilder()

        var n_iter = iter.map(row => {
            val mapRequest = Utils.getMyRowMap(myMap, row)
            sb.append(mapRequest.values.mkString(", ")).append("\n")
        })

        copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
        sb.clear
        n_iter
    }).collect

脚本不断进入 CopyIn 函数,没有数据可插入。我想这可能是因为 iter.map 只是映射分区而不执行收集?我尝试收集整个 myRdd 对象,但仍然没有在 copyIn 函数中获取数据。

我如何遍历 rdd 并附加 StringBuilder 以及为什么上面的代码片段不起作用? 有人知道吗?

iter 是一个 Iterator。所以 iter.map 创建了一个新的 Iterator,但你实际上并没有迭代它,它什么也不做。您可能需要 foreach 来代替。除非 iter 在你 return 时为空,collect 的结果将是一个空的 RDD。

你想要的实际方法是foreachPartition:

myRdd.foreachPartition(iter => {
        val sb = new StringBuilder()

        iter.foreach(row => {
            val mapRequest = Utils.getMyRowMap(myMap, row)
            sb.append(mapRequest.values.mkString(", ")).append("\n")
        })

        copyIn(new StringReader(sb.toString), geoSelectMap.keySet.mkString(", "))
        sb.clear
    })

然后myRdd.collect如果你也想收集它。 (Persist myRdd 如果你想使用它两次而不重新计算它。)