RPostgreSQL 连接一旦用 doParallel clusterEvalQ 启动就会过期

RPostgreSQL connections are expired as soon as they are initiated with doParallel clusterEvalQ

我正在尝试设置一个并行任务,每个工作人员都需要进行数据库查询。我正在尝试为每个工作人员设置连接,如 this question 中所示,但每次我尝试 returns <Expired PostgreSQLConnection:(2781,0)> 时,无论我注册了多少工作人员。

这是我的代码:

cl <- makeCluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(RPostgreSQL)
  drv<-dbDriver("PostgreSQL")
  con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")

})

如果我尝试 运行 我的 foreach 尽管有错误,它会失败 task 1 failed - "expired PostgreSQLConnection"

当我进入 postgres 服务器状态时,它会显示所有已创建的活动会话。

我从我的主 R 实例与 postgres 交互没有任何问题。

如果我运行

clusterEvalQ(cl, {
  library(RPostgreSQL)
  drv<-dbDriver("PostgreSQL")
  con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
  dbGetQuery(con, "select inet_client_port()")

})

然后它将 return 所有客户端端口。它没有给我过期通知,但如果我尝试 运行 我的 foreach 命令,它将失败并出现相同的错误。

编辑:

我已经在 Ubuntu 和 2 windows 台电脑上试过了,它们都给出了同样的错误。

另一个编辑:

现在 3 windows 台计算机

我能够在本地重现您的问题。我不完全确定,但我认为问题与 clusterEvalQ 内部工作方式有关。比如你说 dbGetQuery(con, "select inet_client_port()) 给了你客户端端口输出。如果查询实际上是集群节点上的 evaluated/executed,那么您将无法看到此输出(就像您无法直接读取在外部集群节点上执行的任何其他输出或打印语句一样)。

因此,据我了解,评估首先以某种方式在本地环境上执行,相关函数和变量随后 copied/exported 到各个集群节点。这适用于任何其他类型的 functions/variables 但显然不适用于数据库连接。如果 connections/portmappings 链接到主 R 实例,那么从属实例的连接将不起作用。如果您尝试使用 clusterExport 函数来导出在主实例上创建的连接,您也会得到完全相同的错误。

作为替代方案,您可以在各个 foreach 任务中创建单独的连接。我已通过本地数据库验证以下内容有效:

library(doParallel)
nrCores = detectCores()
cl <- makeCluster(nrCores)
registerDoParallel(cl)
clusterEvalQ(cl,library(RPostgreSQL))
clusterEvalQ(cl,library(DBI))

result <- foreach(i=1:nrCores) %dopar%
{
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
  queryResult <- dbGetQuery(con, "fetch something...")
  dbDisconnect(con)
  return(queryResult)
}
stopCluster(cl)

但是,现在您必须考虑到每 foreach 次迭代都会创建和断开新连接。因此,您可能会产生一些性能开销。您显然可以通过智能拆分 queries/data 来避免这种情况,以便在同一迭代中完成大量工作。理想情况下,您应该将工作完全按照可用的内核数进行拆分。