从“large”future.sequence上的slick.util.asyncexecutor拒绝

xggvc2p6  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(335)

我花了一整天的时间想办法解决这个问题。
其目的是将多个字符串序列插入表的一列中。
我有这样的方法:

case class Column(strings: Seq[String])

def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()

private def insert(column: Column) =
  db.run((stringTable ++= rows)) //slick batch insert

这在一定程度上起作用了。我测试了2100个列(每个列有100个字符串)的序列,结果很好。但是一旦我把列的数量增加到3100+,我就有了这个错误

Task slick.basic.BasicBackend$DatabaseDef$$anon$3@293ce053 rejected from slick.util.AsyncExecutor$$anon$1$$anon$2@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]

我在好几个地方读到过这样做会有帮助

case class Column(strings: Seq[String])

val f = Future.sequence(columns.map(col => insert(col)))

def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()

private def insert(column: Column) =
  db.run((stringTable ++= rows)) //slick batch insert

它没有。
我尝试了几次内部的组合变化 insert ```
Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)

Source(rows).buffer(500, OverflowStrategy.backpressure)
.via(
Slick.flow(row => stringTable += row)
)
.log("nr-of-inserted-rows")
.runWith(Sink.ignore)

Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))

我试过:
不使用 `reWriteBatchedInserts=true` 在我的配置中 `(dataColumnStringsTable ++= rows).transactionally` 选项
使用特定的执行上下文启用单个线程: `implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))` 尝试按顺序执行未来
除了修改订阅服务器以接收和阻止我的消息(字符串序列)以及处理队列消息传递端的背压之外,我没有其他想法。
我正在使用slick(带alpakka slick)3.3.3/hikaricp 3.2.0/postgres 13.2
我的配置就是这样

slick {
profile = "slick.jdbc.PostgresProfile$"
db {
connectionPool = "HikariCP"
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
}
host = "localhost"
numThreads = 10
maxConnections = 100
minConnections = 1
}
}

谢谢你的帮助。
r6l8ljro

r6l8ljro1#

你不应该使用 Future.sequence 包含多个元素的集合。每 Future 是在后台运行的计算。所以当你用这个来收集,比如说,3000个 columns :

Future.sequence(columns.map(col => insert(col)))

你一次有效地产生3000个操作。因此,执行者可能会开始拒绝新任务。
解决方案是用akka流处理输入集合。在您的情况下,这意味着创建一个 Sourcecolumns (不是来自 rows ). 这将确保执行器不会被太多并行操作所淹没。我没用过 alpakka-slick ,但看看文档,解决方案应该是这样的:

Source(columns)
  .via(
    Slick.flow(column => stringTable ++= column.rows) 
  )
  // further processing here

此外,如果“列”来自消息队列,那么您甚至可能不需要中间层 Seq[Column] . 您可能只需要定义 SourceColumn 它从队列中读取数据,并以平滑的流进行处理。

相关问题