使用Slick和MariaDB进行批量插入和LAST_INSERT_ID

fnx2tebb  于 2022-11-08  发布在  其他
关注(0)|答案(1)|浏览(137)

我尝试将一些数据插入到MariaDB数据库中。我有两个表,我必须将行(使用批插入)插入到第一个表中,并使用新插入行的ID对第二个表执行第二次批插入。
我在Scala中使用了Alpakka Slick。为了回答这个问题,我们将tests称为主表,将dependent称为第二个表。
目前,我的算法如下:
1.将行插入tests
1.使用SELECT LAST_INSERT_ID();获取批处理中第一行的ID
1.知道第一行的ID和批处理中的行数后,手动计算其他ID,并将它们用于在第二个表中插入
一次只有一个连接的情况下,这种方法非常有效。但是,我尝试模拟一个同时进行多次尝试写入的场景。为此,我使用Scala并行集合和Akka Stream Source,如下所示:

// three sources of 10 random Strings each
val sources = Seq.fill(3)(Source(Seq.fill(10)(Random.alphanumeric.take(3).mkString))).zipWithIndex
val parallelSources: ParSeq[(Source[String, NotUsed], Int)] = sources.par

parallelSources.map { case (source, i) =>
  source
    .grouped(ChunkSize) // performs batch inserts of a given size
    .via(insert(i))
    .zipWithIndex
    .runWith(Sink.foreach { case (_, chunkIndex) => println(s"Chunk $chunkIndex of source $i  done") })
}

我为每个Source添加一个索引,只是为了在写入DB的数据中使用它作为前缀。
下面是我到目前为止编写的insertFlow的代码:

def insert(srcIndex: Int): Flow[Seq[String], Unit, NotUsed] = {
  implicit val insertSession: SlickSession = slickSession
  system.registerOnTermination(() => insertSession.close())

  Flow[Seq[String]]
    .via(Slick.flowWithPassThrough { chunk =>
      (for {
        // insert data into `tests`
        _ <- InsTests ++= chunk.map(v => TestProj(s"source$srcIndex-$v"))
        // fetch last insert ID and connection ID
        queryResult <- sql"SELECT CONNECTION_ID(), LAST_INSERT_ID();".as[(Long, Long)].headOption
        _ <- queryResult match {
          case Some((connId, firstIdInChunk)) =>
            println(s"Source $srcIndex, last insert ID $firstIdInChunk, connection $connId")
            // compute IDs by hand and write to `dependent`
            val depValues = Seq.fill(ChunkSize)(s"source$srcIndex-${Random.alphanumeric.take(6).mkString}")
            val depRows =
              (firstIdInChunk to (firstIdInChunk + ChunkSize))
                .zip(depValues)
                .map { case (index, value) => DependentProj(index, value) }

            InsDependent ++= depRows
          case None => DBIO.failed(new Exception("..."))
        }
      } yield ()).transactionally
  })
}

其中,InsTestsInsDependent是Slick的TableQuery对象。slickSession为每个不同的插入创建一个新会话,定义如下:

private def slickSession = {
  val db = Database.forURL(
    url = "jdbc:mariadb://localhost:3306/test",
    user = "root",
    password = "password",
    executor = AsyncExecutor(
      name = "executor",
      minThreads = 20,
      maxThreads = 20,
      queueSize = 1000,
      maxConnections = 20
    )
  )
  val profile = slick.jdbc.MySQLProfile
  SlickSession.forDbAndProfile(db, profile)
}

问题是算法的第二步返回的最后一个插入ID重叠。每次运行此应用程序都会打印如下内容:

Source 2, last insert ID 6, connection 66
Source 1, last insert ID 5, connection 68
Source 0, last insert ID 7, connection 67
Chunk 0 of source 0  done
Chunk 0 of source 2  done
Chunk 0 of source 1  done
Source 2, last insert ID 40, connection 70
Source 0, last insert ID 26, connection 69
Source 1, last insert ID 27, connection 71
Chunk 1 of source 2  done
Chunk 1 of source 1  done
Chunk 1 of source 0  done

看起来每个Source的连接都不同,但ID重叠(源0显示7,源1显示5,源2显示2)。ID从5开始是正确的,因为我在创建表后立即添加了4个虚拟行很明显,我在dependent中看到了多个具有相同tests.id的行,这是不应该发生的。
我的理解是,最后插入的ID指的是单个连接,考虑到整个流都被 Package 在一个事务中(通过Slick的transactionally),三个不同的连接怎么可能看到重叠的ID呢?
innodb_autoinc_lock_mode=1会发生这种情况,而innodb_autoinc_lock_mode=0不会发生这种情况,这是有道理的,因为InnoDB会锁定tests,直到整个批插入操作终止。

更新在Georg回答后:对于项目中的一些其他限制,我希望解决方案能够与MariaDB 10.4兼容,据我所知,MariaDB 10.4不支持INSERT...RETURNING。此外,Slick的++=操作符对returning的支持非常差,也报告了here。我在MariaDB 10.4和10.5上测试了它,并且,根据查询日志,Slick确实执行单个INSERT INTO语句,而不是批处理语句,但在我的情况下,这不是完全可以接受的,因为我计划以流的方式写入多个行块。

虽然我也知道假设自动增量值为1并不理想,但我们确实可以控制生产设置,并且不具有多主机复制。

ecr0jaav

ecr0jaav1#

您无法根据LAST_INSERT_ID()生成后续值:
1.可能会有第二个事务在同一时间运行并回滚,因此在auto_incremented ID中会有一个间隙。
1.通过递增LAST_INSERT_ID值来迭代行数将不起作用,因为它取决于会话变量@@auto_increment_increment的值(尤其是在多主机复制中,该值不是1)。
相反,您应该使用RETURNING来获取插入行得ID:

MariaDB [test]> create table t1 (a int not null auto_increment primary key);
Query OK, 0 rows affected (0,022 sec)

MariaDB [test]> insert into t1 (a) values (1),(3),(NULL), (NULL) returning a;
+---+
| a |
+---+
| 1 |
| 3 |
| 4 |
| 5 |
+---+
4 rows in set (0,006 sec)

相关问题