在 map / mapPartitions 上下文中初始化数据库连接的 SPARK 成本

SPARK Cost of Initializing Database Connection in map / mapPartitions context

例子借鉴自网络,感谢高手

可以在各种论坛上找到与 mapPartitions 和 map 有关的以下内容:

... Consider the case of Initializing a database. If we are using map() or 
foreach(), the number of times we would need to initialize will be equal to 
the no of elements in RDD. Whereas if we use mapPartitions(), the no of times 
we would need to initialize would be equal to number of Partitions ...

然后就是这个回复:

val newRd = myRdd.mapPartitions(
  partition => {

    val connection = new DbConnection /*creates a db connection per partition*/

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
     })
    connection.close()
    newPartition
  })

所以,我的问题是在阅读了与此相关的各种项目的讨论之后:

  1. 虽然我可以理解一般使用 mapPartitions 的性能改进,但为什么根据第一段文本,每次使用 map 为 RDD 的每个元素调用数据库连接?我似乎找不到合适的理由。
  2. sc.textFile 不会发生同样的事情......并从 jdbc 连接读取数据帧。或者是吗?如果真是这样,我会感到非常惊讶。

我错过了什么......?

首先这个代码是不正确的。虽然它看起来像是 established pattern for foreachPartition 的改编版,但它不能像这样与 mapPartitions 一起使用。

记住 foreachPartition 需要 Iterator[_] 和 returns Iterator[_],其中 Iterator.map 是惰性的,所以这段代码在实际使用之前关闭连接.

要使用在 mapPartitions 中初始化的某种形式的资源,您必须以某种方式使用设计您的代码,不需要显式释放资源。

the first snippet of text, the database connection be called every time for each element of an RDD using map? I can't seem to find the right reason.

如果没有相关代码段,答案必须是通用的 - mapforeach 并非旨在处理外部状态。在您的问题中显示 API 后,您必须:

rdd.map(record => readMatchingFromDB(record, new DbConnection))

以明显的方式为每个元素创建连接。

不是不可能使用例如单例连接池,做类似的事情:

object Pool {
  lazy val pool = ???
}

rdd.map(record => readMatchingFromDB(record, pool.getConnection))

但要做到这一点并不总是那么容易(考虑线程安全)。而且因为连接和类似的对象,一般不能序列化,我们不能只使用闭包。

相比之下,foreachPartition 模式既明确又简单。

当然可以强制急切执行使事情正常进行,例如:

val newRd = myRdd.mapPartitions(
  partition => {

    val connection = new DbConnection /*creates a db connection per partition*/

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
    }).toList
    connection.close()
    newPartition.toIterator
  })

但这当然有风险,实际上会降低性能。

The same things does not happen with sc.textFile ... and reading into dataframes from jdbc connections. Or does it?

两者都使用低得多的操作 API,但当然不会为每条记录初始化资源。

在我看来,连接应该保持在外面,并且只在映射之前创建一次并关闭 post 任务完成。

val connection = new DbConnection /为每个分区创建一个数据库连接/

val newRd = myRdd.mapPartitions(
  partition => {    

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
     })

    newPartition
  })

connection.close()