我尝试将一些数据插入到MariaDB数据库中。我有两个表,我必须将行(使用批插入)插入到第一个表中,并使用新插入行的ID对第二个表执行第二次批插入。
我在Scala中使用了Alpakka Slick。为了回答这个问题,我们将tests
称为主表,将dependent
称为第二个表。
目前,我的算法如下:
1.将行插入tests
1.使用SELECT LAST_INSERT_ID();
获取批处理中第一行的ID
1.知道第一行的ID和批处理中的行数后,手动计算其他ID,并将它们用于在第二个表中插入
一次只有一个连接的情况下,这种方法非常有效。但是,我尝试模拟一个同时进行多次尝试写入的场景。为此,我使用Scala并行集合和Akka Stream Source
,如下所示:
// three sources of 10 random Strings each
val sources = Seq.fill(3)(Source(Seq.fill(10)(Random.alphanumeric.take(3).mkString))).zipWithIndex
val parallelSources: ParSeq[(Source[String, NotUsed], Int)] = sources.par
parallelSources.map { case (source, i) =>
source
.grouped(ChunkSize) // performs batch inserts of a given size
.via(insert(i))
.zipWithIndex
.runWith(Sink.foreach { case (_, chunkIndex) => println(s"Chunk $chunkIndex of source $i done") })
}
我为每个Source
添加一个索引,只是为了在写入DB的数据中使用它作为前缀。
下面是我到目前为止编写的insert
Flow
的代码:
def insert(srcIndex: Int): Flow[Seq[String], Unit, NotUsed] = {
implicit val insertSession: SlickSession = slickSession
system.registerOnTermination(() => insertSession.close())
Flow[Seq[String]]
.via(Slick.flowWithPassThrough { chunk =>
(for {
// insert data into `tests`
_ <- InsTests ++= chunk.map(v => TestProj(s"source$srcIndex-$v"))
// fetch last insert ID and connection ID
queryResult <- sql"SELECT CONNECTION_ID(), LAST_INSERT_ID();".as[(Long, Long)].headOption
_ <- queryResult match {
case Some((connId, firstIdInChunk)) =>
println(s"Source $srcIndex, last insert ID $firstIdInChunk, connection $connId")
// compute IDs by hand and write to `dependent`
val depValues = Seq.fill(ChunkSize)(s"source$srcIndex-${Random.alphanumeric.take(6).mkString}")
val depRows =
(firstIdInChunk to (firstIdInChunk + ChunkSize))
.zip(depValues)
.map { case (index, value) => DependentProj(index, value) }
InsDependent ++= depRows
case None => DBIO.failed(new Exception("..."))
}
} yield ()).transactionally
})
}
其中,InsTests
和InsDependent
是Slick的TableQuery
对象。slickSession
为每个不同的插入创建一个新会话,定义如下:
private def slickSession = {
val db = Database.forURL(
url = "jdbc:mariadb://localhost:3306/test",
user = "root",
password = "password",
executor = AsyncExecutor(
name = "executor",
minThreads = 20,
maxThreads = 20,
queueSize = 1000,
maxConnections = 20
)
)
val profile = slick.jdbc.MySQLProfile
SlickSession.forDbAndProfile(db, profile)
}
问题是算法的第二步返回的最后一个插入ID重叠。每次运行此应用程序都会打印如下内容:
Source 2, last insert ID 6, connection 66
Source 1, last insert ID 5, connection 68
Source 0, last insert ID 7, connection 67
Chunk 0 of source 0 done
Chunk 0 of source 2 done
Chunk 0 of source 1 done
Source 2, last insert ID 40, connection 70
Source 0, last insert ID 26, connection 69
Source 1, last insert ID 27, connection 71
Chunk 1 of source 2 done
Chunk 1 of source 1 done
Chunk 1 of source 0 done
看起来每个Source
的连接都不同,但ID重叠(源0显示7
,源1显示5
,源2显示2
)。ID从5
开始是正确的,因为我在创建表后立即添加了4个虚拟行很明显,我在dependent
中看到了多个具有相同tests.id
的行,这是不应该发生的。
我的理解是,最后插入的ID指的是单个连接,考虑到整个流都被 Package 在一个事务中(通过Slick的transactionally
),三个不同的连接怎么可能看到重叠的ID呢?innodb_autoinc_lock_mode=1
会发生这种情况,而innodb_autoinc_lock_mode=0
不会发生这种情况,这是有道理的,因为InnoDB会锁定tests
,直到整个批插入操作终止。
更新在Georg回答后:对于项目中的一些其他限制,我希望解决方案能够与MariaDB 10.4兼容,据我所知,MariaDB 10.4不支持INSERT...RETURNING
。此外,Slick的++=
操作符对returning
的支持非常差,也报告了here。我在MariaDB 10.4和10.5上测试了它,并且,根据查询日志,Slick确实执行单个INSERT INTO
语句,而不是批处理语句,但在我的情况下,这不是完全可以接受的,因为我计划以流的方式写入多个行块。
虽然我也知道假设自动增量值为1
并不理想,但我们确实可以控制生产设置,并且不具有多主机复制。
1条答案
按热度按时间ecr0jaav1#
您无法根据LAST_INSERT_ID()生成后续值:
1.可能会有第二个事务在同一时间运行并回滚,因此在auto_incremented ID中会有一个间隙。
1.通过递增LAST_INSERT_ID值来迭代行数将不起作用,因为它取决于会话变量@@auto_increment_increment的值(尤其是在多主机复制中,该值不是1)。
相反,您应该使用RETURNING来获取插入行得ID: