在 "large" Future.sequence 被 slick.util.AsyncExecutor 拒绝
rejected from slick.util.AsyncExecutor on "large" Future.sequence
我花了一整天时间想办法解决这个问题。
目的是将几个字符串序列插入到 table 的单个列中。
我有这样的方法:
case class Column(strings: Seq[String])
def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
这在一定程度上起到了作用。
我测试了 2100 列的序列(每列有 100 个字符串),它工作正常。
但是一旦我将列数增加到 3100+,就会出现此错误
Task slick.basic.BasicBackend$DatabaseDef$$anon@293ce053 rejected from slick.util.AsyncExecutor$$anon$$anon@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]
我在几个地方读到过这样做会有所帮助
case class Column(strings: Seq[String])
val f = Future.sequence(columns.map(col => insert(col)))
def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
没有。
我在里面尝试了几种变化组合insert
Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)
Source(rows).buffer(500, OverflowStrategy.backpressure)
.via(
Slick.flow(row => stringTable += row)
)
.log("nr-of-inserted-rows")
.runWith(Sink.ignore)
Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))
我试过了:
- 不要在我的配置中使用
reWriteBatchedInserts=true
(dataColumnStringsTable ++= rows).transactionally
选项
- 使用特定的执行上下文启用单个线程:
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
尝试按顺序执行期货
除了修改我的订阅者以接收和阻止我的消息(字符串序列)并处理队列消息传递端的背压外,我没有任何其他想法。
我正在使用 slick(带有 alpakka-slick)3.3.3 / HikariCP 3.2.0 / Postgres 13.2
我的配置是这样的
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
connectionPool = "HikariCP"
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
}
host = "localhost"
numThreads = 10
maxConnections = 100
minConnections = 1
}
}
感谢您的帮助。
您不应将 Future.sequence
用于包含多个元素的集合。每个 Future
都是后台的计算 运行ning。所以当你 运行 这对于一个集合,比方说,3000 columns
:
Future.sequence(columns.map(col => insert(col)))
您一次有效地产生了 3000 个操作。因此,执行者可能会开始拒绝新任务。
解决办法是用Akka Streams处理输入集合。在您的情况下,这意味着从 columns
(而不是 rows
)创建一个 Source
。这将确保执行器不会被太多的并行操作淹没。我没用过 alpakka-slick
,但看看 docs,解决方案应该是这样的:
Source(columns)
.via(
Slick.flow(column => stringTable ++= column.rows)
)
// further processing here
此外,如果“列”来自消息队列,您甚至可能不需要中间层 Seq[Column]
。您可能只需要定义一个 Source
of Column
从队列中读取,并使用 Slick 流处理它。
我花了一整天时间想办法解决这个问题。
目的是将几个字符串序列插入到 table 的单个列中。
我有这样的方法:
case class Column(strings: Seq[String])
def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
这在一定程度上起到了作用。 我测试了 2100 列的序列(每列有 100 个字符串),它工作正常。 但是一旦我将列数增加到 3100+,就会出现此错误
Task slick.basic.BasicBackend$DatabaseDef$$anon@293ce053 rejected from slick.util.AsyncExecutor$$anon$$anon@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]
我在几个地方读到过这样做会有所帮助
case class Column(strings: Seq[String])
val f = Future.sequence(columns.map(col => insert(col)))
def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
没有。
我在里面尝试了几种变化组合insert
Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)
Source(rows).buffer(500, OverflowStrategy.backpressure)
.via(
Slick.flow(row => stringTable += row)
)
.log("nr-of-inserted-rows")
.runWith(Sink.ignore)
Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))
我试过了:
- 不要在我的配置中使用
reWriteBatchedInserts=true
(dataColumnStringsTable ++= rows).transactionally
选项- 使用特定的执行上下文启用单个线程:
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
尝试按顺序执行期货
除了修改我的订阅者以接收和阻止我的消息(字符串序列)并处理队列消息传递端的背压外,我没有任何其他想法。
我正在使用 slick(带有 alpakka-slick)3.3.3 / HikariCP 3.2.0 / Postgres 13.2
我的配置是这样的
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
connectionPool = "HikariCP"
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
}
host = "localhost"
numThreads = 10
maxConnections = 100
minConnections = 1
}
}
感谢您的帮助。
您不应将 Future.sequence
用于包含多个元素的集合。每个 Future
都是后台的计算 运行ning。所以当你 运行 这对于一个集合,比方说,3000 columns
:
Future.sequence(columns.map(col => insert(col)))
您一次有效地产生了 3000 个操作。因此,执行者可能会开始拒绝新任务。
解决办法是用Akka Streams处理输入集合。在您的情况下,这意味着从 columns
(而不是 rows
)创建一个 Source
。这将确保执行器不会被太多的并行操作淹没。我没用过 alpakka-slick
,但看看 docs,解决方案应该是这样的:
Source(columns)
.via(
Slick.flow(column => stringTable ++= column.rows)
)
// further processing here
此外,如果“列”来自消息队列,您甚至可能不需要中间层 Seq[Column]
。您可能只需要定义一个 Source
of Column
从队列中读取,并使用 Slick 流处理它。