将分析数据从 Spark 插入到 Postgres

Inserting Analytic data from Spark to Postgres

我有 Cassandra 数据库,我通过 Apache Spark 使用 SparkSQL 分析了其中的数据。现在我想将那些分析过的数据插入到 PostgreSQL 中。除了使用 PostgreSQL 驱动程序(我使用 postREST 和驱动程序实现它,我想知道是否有 saveToCassandra() 之类的方法)之外,是否有任何方法可以直接实现此目的?

目前还没有将 RDD 写入任何 DBMS 的本机实现。以下是 Spark 用户列表中相关讨论的链接:one, two

一般来说,最有效的方法如下:

  1. 验证RDD的分区数,不能太低也不能太高。 20-50 个分区应该没问题,如果数量较少 - 调用 repartition 20 个分区,如果较高 - 调用 coalesce 50 个分区
  2. 调用 mapPartition 转换,在其中调用使用 JDBC 将记录插入 DBMS 的函数。在此函数中,您打开与数据库的连接,并使用带有 this API 的 COPY 命令,这将允许您消除对每条记录的单独命令的需要 - 这样插入将被处理得更快

通过这种方式,您可以使用最多 50 个并行连接(取决于您的 Spark 集群大小及其配置)以并行方式将数据插入 Postgres。整个方法可以作为接受 RDD 和连接字符串

的 Java/Scala 函数来实现

0x0FFF 的回答很好。这是一个有用的附加点。

我使用 foreachPartition 持久化到外部存储。这也符合 Spark 文档中给出的设计模式 Design Patterns for using foreachRDD https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

示例:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

你可以用Postgres copy api来写,那样快多了。请参阅以下两种方法 - 一种遍历 RDD 以填充可以通过复制 api 保存的缓冲区。您唯一需要注意的是以 csv 格式创建正确的语句,副本 api.

将使用该语句
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
        val sb = mutable.StringBuilder.newBuilder
        val now = System.currentTimeMillis()

        rdd.collect().foreach(itr => {
            itr.foreach(_.createCSV(sb, now).append("\n"))
        })

        copyIn("myTable",  new StringReader(sb.toString), "statement")
        sb.clear
    }


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
        val conn = connectionPool.getConnection()
        try {
            conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => logWarning(se.getMessage)
            case t: Throwable => logWarning(t.getMessage)
        } finally {
            conn.close()
        }
    }

上面的答案是指旧的 spark 版本,在 spark 2.* 中有 jdbc 连接器,可以直接从数据帧写入 RDBS。

示例:

jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html