使用 R 池包连接到 PostgreSQL 数据库
Reconnect to PostgreSQL database with R's pool package
我有一个用 R plumber that connects to a PostgreSQL database using RPostgreSQL and pool 构建的 API(尽管如果我使用的是 Shiny 应用程序,这也适用):
# create the connection pool
pool <- dbPool(
drv = PostgreSQL(),
host = Sys.getenv("DB_HOST"),
port = 5432,
dbname = "db",
user = Sys.getenv("DB_USER"),
password = Sys.getenv("DB_PASSWORD")
)
# start the API
pr <- plumb("plumber.R")
# on stop, close the pool
pr$registerHooks(
list("exit" = function() { poolClose(pool) })
)
我想每天导入新数据。最简单的方法是创建一个新数据库并将其推广到生产环境:
CREATE DATABASE db_new;
-- create the tables
-- bulk-insert the data
SELECT pg_terminate_backend (pid) FROM pg_stat_activity WHERE datname = 'db';
DROP DATABASE db;
ALTER DATABASE db_new RENAME TO db;
这速度很快,并且可以最大限度地减少停机时间。问题是 pool
然后失去与数据库的连接并且不会自动尝试重新连接:
> tbl(pool, "users")
Error in postgresqlExecStatement(conn, statement, ...) :
RS-DBI driver: (could not Retrieve the result : FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
)
即使我不是每天都更换数据库,数据库服务器偶尔也会重启,这也会导致我的应用程序崩溃。重新连接似乎不是池、RPostgreSQL 或 DBI 的功能。有谁知道解决这个问题的方法吗?
我最近遇到了类似的问题,因为 MySQL 个连接在超过实例的 wait_timeout
时被关闭。我遇到了 your post on RStudio Community,并受到了您的解决方案的启发。如果您仍在使用它,并且正在寻找一种解决方案,在包装您使用的实际功能时避免额外查询,这里有一个 reprex 展示了我想出的东西,以及一个证明它有效的例子:
library(dplyr, warn.conflicts = FALSE)
library(pool)
library(RMariaDB)
generate_safe_query <- function(pool) {
function(db_function, ...) {
tryCatch({
db_function(pool, ...)
}, error = function(e) {
if (grepl("Lost connection to MySQL server during query", e$message)) {
# Preserve `validationInterval` so that it can be restored
validation_interval <- pool$validationInterval
# Trigger destruction of dead connection
pool$validationInterval <- 0
refreshed_connection <- poolCheckout(pool)
poolReturn(refreshed_connection)
# Restore original `validationInterval`
pool$validationInterval <- validation_interval
# Execute the query with the new connection
db_function(pool, ...)
} else {
# Unexpected error
stop(e)
}
})
}
}
mysql_pool <- dbPool(MariaDB(),
host = "127.0.0.1",
username = "root",
password = "",
dbname = "test")
safe_query <- generate_safe_query(mysql_pool)
# Works
safe_query(tbl, "notes")
#> # Source: table<notes> [?? x 2]
#> # Database: mysql 8.0.15 [root@127.0.0.1:/test]
#> id note
#> <int> <chr>
#> 1 1 NOTE1
# Set the `wait_timeout` to 5 seconds for this session
invisible(safe_query(dbExecute, "SET SESSION wait_timeout = 5"))
# Wait longer than `wait_timeout` to trigger a disconnect
Sys.sleep(6)
# Still works; warning will appear notifying that connection was
# destroyed and replaced with a new one
safe_query(tbl, "notes")
#> Warning: It wasn't possible to activate and/or validate the object. Trying
#> again with a new object.
#> # Source: table<notes> [?? x 2]
#> # Database: mysql 8.0.15 [root@127.0.0.1:/test]
#> id note
#> <int> <chr>
#> 1 1 NOTE1
safe_query(poolClose)
# Or, equivalently:
# poolClose(mysql_pool)
由 reprex package (v0.3.0)
于 2019-05-30 创建
generate_safe_query
返回的函数适用于任何数据库查询函数(例如 dbExecute
、dbGetQuery
等)。显然,您需要更新它匹配的错误消息以满足您的需要。
我还打开了我自己的 Community topic 选项,我认为应该包含在 dbPool
中,这将减少对此类解决方法的需求。
我正在使用具有以下功能的普通 DBI(无池)以始终为 DBI 调用提供活动连接(例如 DBI::dbExistsTable(rdsConnect(), "mytable"))。
#' Connect returns a database connection.
#' Retrieves the connection parameters from configuration.
#'
#' FIXME: dbIsValid is not implemented
#' https://github.com/tomoakin/RPostgreSQL/issues/76
#' workaround implemented with isPostgresqlIdCurrent()
#' @return rds allocated connection
rdsConnect <- function() {
if (!((exists("rds") && (isPostgresqlIdCurrent(rds))))) {
source('./config.R', local = TRUE)
print("New PostgreSQL connection")
rds <<- DBI::dbConnect(RPostgreSQL::PostgreSQL(),
dbname = rds_params("rds_database"),
host = rds_params("rds_host"),
user = rds_params("rds_user"),
password = rds_params("rds_password")
)
} else print("Valid PostgreSQL connection")
return(rds)
}
我有一个用 R plumber that connects to a PostgreSQL database using RPostgreSQL and pool 构建的 API(尽管如果我使用的是 Shiny 应用程序,这也适用):
# create the connection pool
pool <- dbPool(
drv = PostgreSQL(),
host = Sys.getenv("DB_HOST"),
port = 5432,
dbname = "db",
user = Sys.getenv("DB_USER"),
password = Sys.getenv("DB_PASSWORD")
)
# start the API
pr <- plumb("plumber.R")
# on stop, close the pool
pr$registerHooks(
list("exit" = function() { poolClose(pool) })
)
我想每天导入新数据。最简单的方法是创建一个新数据库并将其推广到生产环境:
CREATE DATABASE db_new;
-- create the tables
-- bulk-insert the data
SELECT pg_terminate_backend (pid) FROM pg_stat_activity WHERE datname = 'db';
DROP DATABASE db;
ALTER DATABASE db_new RENAME TO db;
这速度很快,并且可以最大限度地减少停机时间。问题是 pool
然后失去与数据库的连接并且不会自动尝试重新连接:
> tbl(pool, "users")
Error in postgresqlExecStatement(conn, statement, ...) :
RS-DBI driver: (could not Retrieve the result : FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
)
即使我不是每天都更换数据库,数据库服务器偶尔也会重启,这也会导致我的应用程序崩溃。重新连接似乎不是池、RPostgreSQL 或 DBI 的功能。有谁知道解决这个问题的方法吗?
我最近遇到了类似的问题,因为 MySQL 个连接在超过实例的 wait_timeout
时被关闭。我遇到了 your post on RStudio Community,并受到了您的解决方案的启发。如果您仍在使用它,并且正在寻找一种解决方案,在包装您使用的实际功能时避免额外查询,这里有一个 reprex 展示了我想出的东西,以及一个证明它有效的例子:
library(dplyr, warn.conflicts = FALSE)
library(pool)
library(RMariaDB)
generate_safe_query <- function(pool) {
function(db_function, ...) {
tryCatch({
db_function(pool, ...)
}, error = function(e) {
if (grepl("Lost connection to MySQL server during query", e$message)) {
# Preserve `validationInterval` so that it can be restored
validation_interval <- pool$validationInterval
# Trigger destruction of dead connection
pool$validationInterval <- 0
refreshed_connection <- poolCheckout(pool)
poolReturn(refreshed_connection)
# Restore original `validationInterval`
pool$validationInterval <- validation_interval
# Execute the query with the new connection
db_function(pool, ...)
} else {
# Unexpected error
stop(e)
}
})
}
}
mysql_pool <- dbPool(MariaDB(),
host = "127.0.0.1",
username = "root",
password = "",
dbname = "test")
safe_query <- generate_safe_query(mysql_pool)
# Works
safe_query(tbl, "notes")
#> # Source: table<notes> [?? x 2]
#> # Database: mysql 8.0.15 [root@127.0.0.1:/test]
#> id note
#> <int> <chr>
#> 1 1 NOTE1
# Set the `wait_timeout` to 5 seconds for this session
invisible(safe_query(dbExecute, "SET SESSION wait_timeout = 5"))
# Wait longer than `wait_timeout` to trigger a disconnect
Sys.sleep(6)
# Still works; warning will appear notifying that connection was
# destroyed and replaced with a new one
safe_query(tbl, "notes")
#> Warning: It wasn't possible to activate and/or validate the object. Trying
#> again with a new object.
#> # Source: table<notes> [?? x 2]
#> # Database: mysql 8.0.15 [root@127.0.0.1:/test]
#> id note
#> <int> <chr>
#> 1 1 NOTE1
safe_query(poolClose)
# Or, equivalently:
# poolClose(mysql_pool)
由 reprex package (v0.3.0)
于 2019-05-30 创建generate_safe_query
返回的函数适用于任何数据库查询函数(例如 dbExecute
、dbGetQuery
等)。显然,您需要更新它匹配的错误消息以满足您的需要。
我还打开了我自己的 Community topic 选项,我认为应该包含在 dbPool
中,这将减少对此类解决方法的需求。
我正在使用具有以下功能的普通 DBI(无池)以始终为 DBI 调用提供活动连接(例如 DBI::dbExistsTable(rdsConnect(), "mytable"))。
#' Connect returns a database connection.
#' Retrieves the connection parameters from configuration.
#'
#' FIXME: dbIsValid is not implemented
#' https://github.com/tomoakin/RPostgreSQL/issues/76
#' workaround implemented with isPostgresqlIdCurrent()
#' @return rds allocated connection
rdsConnect <- function() {
if (!((exists("rds") && (isPostgresqlIdCurrent(rds))))) {
source('./config.R', local = TRUE)
print("New PostgreSQL connection")
rds <<- DBI::dbConnect(RPostgreSQL::PostgreSQL(),
dbname = rds_params("rds_database"),
host = rds_params("rds_host"),
user = rds_params("rds_user"),
password = rds_params("rds_password")
)
} else print("Valid PostgreSQL connection")
return(rds)
}