与 SQL 服务器的 Spark JDBC 连接经常超时

Spark JDBC connection to SQL Server times out often

我是 运行 Spark v2.2.1 通过 sparklyr v0.6.2 并通过 jdbc 从 SQL 服务器拉取数据。我似乎遇到了一些网络问题,因为 很多次 (不是每次)我的执行者正在写入 SQL 服务器失败并出现错误:

Prelogin error: host <my server> port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:...

我是 运行 我的 sparklyr 会话,配置如下:

spark_conf = spark_config()
spark_conf$spark.executor.cores <- 8
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"
spark_conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
spark_conf$spark.network.timeout <- 400

但有趣的是,根据执行程序日志,我在上面设置的网络超时似乎并不适用:

18/06/11 17:53:44 INFO BlockManager: Found block rdd_9_16 locally
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:3 ClientConnectionId: d3568a9f-049f-4772-83d4-ed65b907fc8b Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:d3568a9f-049f-4772-83d4-ed65b907fc8b
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:2 ClientConnectionId: ecb084e6-99a8-49d1-9215-491324e8d133 Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:ecb084e6-99a8-49d1-9215-491324e8d133
18/06/11 17:53:45 ERROR Executor: Exception in task 10.0 in stage 26.0 (TID 77)

谁能帮我理解什么是登录前错误以及如何避免这个问题?这是我的写函数:

function (df, tbl, db, server = NULL, user, pass, mode = "error", 
    options = list(), ...) 
{
    sparklyr::spark_write_jdbc(
  df, 
  tbl, 
  options = c(
    list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;", 
         "databaseName=", db, ";", 
         "user=", user, ";", 
         "password=", pass, ";"), 
       driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
    options), 
  mode = mode, ...)
}

我刚刚将我的 jdbc 驱动程序更新到 6.0 版,但我认为这没有什么不同。我希望我安装正确。我只是将它放入我的 Spark/jars 文件夹,然后将其添加到 Spark/conf/spark-defaults.conf.

编辑 我正在将 24 个分区中的 2300 万行读入 Spark。我的集群有 4 个节点,每个节点有 8 个内核和 18G 内存。在我当前的配置下,我有 4 个执行器,每个执行器有 8 个内核,每个执行器有 12G。我读取数据的函数如下所示:

function (sc, tbl, db, server = NULL, user, pass, repartition = 0, options = list(), ...) 
{
    sparklyr::spark_read_jdbc(
      sc, 
      tbl, 
      options = c(
        list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;"), 
             user = user, 
             password = pass, 
             databaseName = db, 
             dbtable = tbl, 
             driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
        options), 
      repartition = repartition, ...)
}

我在运行时将repartition设置为24。因此,我没有看到与建议的 post 的联系。

编辑 2
我能够通过摆脱重新分区来解决我的问题。谁能解释为什么在这种情况下使用 sparklyr 重新分区无效?

如解释的那样 in the other question, as well as some other posts (, Converting mysql table to spark dataset is very slow compared to same from csv file, , spark reading data from mysql in parallel) and off-site resources (Parallelizing Reads),默认情况下 Spark JDBC 源将所有数据按顺序读取到单个节点中。

并行化读取有两种方式:

  • 基于需要 lowerBoundupperBoundpartitionColumnnumPartitions options 的数字列的范围拆分,其中 partitionColumn 是一个稳定的数字列 (pseudocolumns might not be a good choice)

    spark_read_jdbc(
      ...,
      options = list(
        ...
        lowerBound = "0",                 # Adjust to fit your data 
        upperBound = "5000",              # Adjust to fit your data 
        numPartitions = "42",             # Adjust to fit your data and resources
        partitionColumn = "some_numeric_column"
      )
     )
    
  • predicates 列表 - 目前 sparklyr 不支持。

重新分区(sparklyr::sdf_repartition 不能解决问题,因为它发生在数据加载之后。由于 shuffle(repartition 需要)属于 Spark 中最昂贵的操作,它很容易崩溃节点。

结果使用:

  • repartition spark_read_jdbc的参数:

  • sdf_repartition

只是一种货物崇拜行为,大多数时候弊大于利。如果数据小到可以通过单个节点传输,那么增加分区数量通常会降低性能。否则它只会崩溃。

话虽这么说 - 如果数据已经由单个节点处理,则会引发一个问题,即使用 Apache Spark 是否有意义。答案将取决于您管道的其余部分,但仅考虑有问题的组件,它可能是否定的。