与 SQL 服务器的 Spark JDBC 连接经常超时
Spark JDBC connection to SQL Server times out often
我是 运行 Spark v2.2.1 通过 sparklyr v0.6.2 并通过 jdbc 从 SQL 服务器拉取数据。我似乎遇到了一些网络问题,因为 很多次 (不是每次)我的执行者正在写入 SQL 服务器失败并出现错误:
Prelogin error: host <my server> port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:...
我是 运行 我的 sparklyr 会话,配置如下:
spark_conf = spark_config()
spark_conf$spark.executor.cores <- 8
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"
spark_conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
spark_conf$spark.network.timeout <- 400
但有趣的是,根据执行程序日志,我在上面设置的网络超时似乎并不适用:
18/06/11 17:53:44 INFO BlockManager: Found block rdd_9_16 locally
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:3 ClientConnectionId: d3568a9f-049f-4772-83d4-ed65b907fc8b Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:d3568a9f-049f-4772-83d4-ed65b907fc8b
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:2 ClientConnectionId: ecb084e6-99a8-49d1-9215-491324e8d133 Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:ecb084e6-99a8-49d1-9215-491324e8d133
18/06/11 17:53:45 ERROR Executor: Exception in task 10.0 in stage 26.0 (TID 77)
谁能帮我理解什么是登录前错误以及如何避免这个问题?这是我的写函数:
function (df, tbl, db, server = NULL, user, pass, mode = "error",
options = list(), ...)
{
sparklyr::spark_write_jdbc(
df,
tbl,
options = c(
list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;",
"databaseName=", db, ";",
"user=", user, ";",
"password=", pass, ";"),
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"),
options),
mode = mode, ...)
}
我刚刚将我的 jdbc 驱动程序更新到 6.0 版,但我认为这没有什么不同。我希望我安装正确。我只是将它放入我的 Spark/jars
文件夹,然后将其添加到 Spark/conf/spark-defaults.conf
.
编辑
我正在将 24 个分区中的 2300 万行读入 Spark。我的集群有 4 个节点,每个节点有 8 个内核和 18G 内存。在我当前的配置下,我有 4 个执行器,每个执行器有 8 个内核,每个执行器有 12G。我读取数据的函数如下所示:
function (sc, tbl, db, server = NULL, user, pass, repartition = 0, options = list(), ...)
{
sparklyr::spark_read_jdbc(
sc,
tbl,
options = c(
list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;"),
user = user,
password = pass,
databaseName = db,
dbtable = tbl,
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"),
options),
repartition = repartition, ...)
}
我在运行时将repartition
设置为24。因此,我没有看到与建议的 post 的联系。
编辑 2
我能够通过摆脱重新分区来解决我的问题。谁能解释为什么在这种情况下使用 sparklyr 重新分区无效?
如解释的那样 in the other question, as well as some other posts (, Converting mysql table to spark dataset is very slow compared to same from csv file, , spark reading data from mysql in parallel) and off-site resources (Parallelizing Reads),默认情况下 Spark JDBC 源将所有数据按顺序读取到单个节点中。
并行化读取有两种方式:
基于需要 lowerBound
、upperBound
、partitionColumn
和 numPartitions
options
的数字列的范围拆分,其中 partitionColumn
是一个稳定的数字列 (pseudocolumns might not be a good choice)
spark_read_jdbc(
...,
options = list(
...
lowerBound = "0", # Adjust to fit your data
upperBound = "5000", # Adjust to fit your data
numPartitions = "42", # Adjust to fit your data and resources
partitionColumn = "some_numeric_column"
)
)
predicates
列表 - 目前 sparklyr
不支持。
重新分区(sparklyr::sdf_repartition
不能解决问题,因为它发生在数据加载之后。由于 shuffle(repartition
需要)属于 Spark 中最昂贵的操作,它很容易崩溃节点。
结果使用:
repartition
spark_read_jdbc
的参数:
sdf_repartition
只是一种货物崇拜行为,大多数时候弊大于利。如果数据小到可以通过单个节点传输,那么增加分区数量通常会降低性能。否则它只会崩溃。
话虽这么说 - 如果数据已经由单个节点处理,则会引发一个问题,即使用 Apache Spark 是否有意义。答案将取决于您管道的其余部分,但仅考虑有问题的组件,它可能是否定的。
我是 运行 Spark v2.2.1 通过 sparklyr v0.6.2 并通过 jdbc 从 SQL 服务器拉取数据。我似乎遇到了一些网络问题,因为 很多次 (不是每次)我的执行者正在写入 SQL 服务器失败并出现错误:
Prelogin error: host <my server> port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:...
我是 运行 我的 sparklyr 会话,配置如下:
spark_conf = spark_config()
spark_conf$spark.executor.cores <- 8
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"
spark_conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
spark_conf$spark.network.timeout <- 400
但有趣的是,根据执行程序日志,我在上面设置的网络超时似乎并不适用:
18/06/11 17:53:44 INFO BlockManager: Found block rdd_9_16 locally
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:3 ClientConnectionId: d3568a9f-049f-4772-83d4-ed65b907fc8b Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:d3568a9f-049f-4772-83d4-ed65b907fc8b
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:2 ClientConnectionId: ecb084e6-99a8-49d1-9215-491324e8d133 Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:ecb084e6-99a8-49d1-9215-491324e8d133
18/06/11 17:53:45 ERROR Executor: Exception in task 10.0 in stage 26.0 (TID 77)
谁能帮我理解什么是登录前错误以及如何避免这个问题?这是我的写函数:
function (df, tbl, db, server = NULL, user, pass, mode = "error",
options = list(), ...)
{
sparklyr::spark_write_jdbc(
df,
tbl,
options = c(
list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;",
"databaseName=", db, ";",
"user=", user, ";",
"password=", pass, ";"),
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"),
options),
mode = mode, ...)
}
我刚刚将我的 jdbc 驱动程序更新到 6.0 版,但我认为这没有什么不同。我希望我安装正确。我只是将它放入我的 Spark/jars
文件夹,然后将其添加到 Spark/conf/spark-defaults.conf
.
编辑 我正在将 24 个分区中的 2300 万行读入 Spark。我的集群有 4 个节点,每个节点有 8 个内核和 18G 内存。在我当前的配置下,我有 4 个执行器,每个执行器有 8 个内核,每个执行器有 12G。我读取数据的函数如下所示:
function (sc, tbl, db, server = NULL, user, pass, repartition = 0, options = list(), ...)
{
sparklyr::spark_read_jdbc(
sc,
tbl,
options = c(
list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;"),
user = user,
password = pass,
databaseName = db,
dbtable = tbl,
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"),
options),
repartition = repartition, ...)
}
我在运行时将repartition
设置为24。因此,我没有看到与建议的 post 的联系。
编辑 2
我能够通过摆脱重新分区来解决我的问题。谁能解释为什么在这种情况下使用 sparklyr 重新分区无效?
如解释的那样 in the other question, as well as some other posts (
并行化读取有两种方式:
基于需要
lowerBound
、upperBound
、partitionColumn
和numPartitions
options
的数字列的范围拆分,其中partitionColumn
是一个稳定的数字列 (pseudocolumns might not be a good choice)spark_read_jdbc( ..., options = list( ... lowerBound = "0", # Adjust to fit your data upperBound = "5000", # Adjust to fit your data numPartitions = "42", # Adjust to fit your data and resources partitionColumn = "some_numeric_column" ) )
predicates
列表 - 目前sparklyr
不支持。
重新分区(sparklyr::sdf_repartition
不能解决问题,因为它发生在数据加载之后。由于 shuffle(repartition
需要)属于 Spark 中最昂贵的操作,它很容易崩溃节点。
结果使用:
repartition
spark_read_jdbc
的参数:sdf_repartition
只是一种货物崇拜行为,大多数时候弊大于利。如果数据小到可以通过单个节点传输,那么增加分区数量通常会降低性能。否则它只会崩溃。
话虽这么说 - 如果数据已经由单个节点处理,则会引发一个问题,即使用 Apache Spark 是否有意义。答案将取决于您管道的其余部分,但仅考虑有问题的组件,它可能是否定的。