我花了一整天的时间想办法解决这个问题。
其目的是将多个字符串序列插入表的一列中。
我有这样的方法:
case class Column(strings: Seq[String])
def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
这在一定程度上起作用了。我测试了2100个列(每个列有100个字符串)的序列,结果很好。但是一旦我把列的数量增加到3100+,我就有了这个错误
Task slick.basic.BasicBackend$DatabaseDef$$anon$3@293ce053 rejected from slick.util.AsyncExecutor$$anon$1$$anon$2@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]
我在好几个地方读到过这样做会有帮助
case class Column(strings: Seq[String])
val f = Future.sequence(columns.map(col => insert(col)))
def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
它没有。
我尝试了几次内部的组合变化 insert
```
Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)
Source(rows).buffer(500, OverflowStrategy.backpressure)
.via(
Slick.flow(row => stringTable += row)
)
.log("nr-of-inserted-rows")
.runWith(Sink.ignore)
Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))
我试过:
不使用 `reWriteBatchedInserts=true` 在我的配置中 `(dataColumnStringsTable ++= rows).transactionally` 选项
使用特定的执行上下文启用单个线程: `implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))` 尝试按顺序执行未来
除了修改订阅服务器以接收和阻止我的消息(字符串序列)以及处理队列消息传递端的背压之外,我没有其他想法。
我正在使用slick(带alpakka slick)3.3.3/hikaricp 3.2.0/postgres 13.2
我的配置就是这样
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
connectionPool = "HikariCP"
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
}
host = "localhost"
numThreads = 10
maxConnections = 100
minConnections = 1
}
}
谢谢你的帮助。
1条答案
按热度按时间r6l8ljro1#
你不应该使用
Future.sequence
包含多个元素的集合。每Future
是在后台运行的计算。所以当你用这个来收集,比如说,3000个columns
:你一次有效地产生3000个操作。因此,执行者可能会开始拒绝新任务。
解决方案是用akka流处理输入集合。在您的情况下,这意味着创建一个
Source
从columns
(不是来自rows
). 这将确保执行器不会被太多并行操作所淹没。我没用过alpakka-slick
,但看看文档,解决方案应该是这样的:此外,如果“列”来自消息队列,那么您甚至可能不需要中间层
Seq[Column]
. 您可能只需要定义Source
的Column
它从队列中读取数据,并以平滑的流进行处理。