我们能阻止从db表读取独占锁定的行吗?

6mw9ycah  于 2021-06-15  发布在  Mysql
关注(0)|答案(3)|浏览(396)

我的mysql(innodb)数据库中有一个表employee,它有以下列,
内部职员id(自动递增主键)
外部员工id
名称
性别
导出(布尔字段)
在分布式系统中,我希望确保集群中的多个进程每次将导出列设置为false时都从表中读取前100个不同的行。在计算过程中,进程读取的行应该保持锁定,这样如果process1读取行1-100,process2就不能看到1-100中的行,然后应该拾取下一个可用的100行。
为此,我尝试使用悲观的写锁,但它们似乎不起作用。它们确实阻止多个进程同时更新,但多个进程可以读取相同的锁定行。
我试着使用以下java代码,

Query query = entityManager.createNativeQuery("select * from employee " +
        "where exported = 0 limit 100 for update");
    List<Employee> employeeListLocked = query.getResultList();
cczfrluj

cczfrluj1#

有几种方法可以阻止读取:
要首先更新表的会话执行以下操作:

LOCK TABLES employee WRITE;

这将获取表上的独占元数据锁。然后其他会话被阻塞,即使它们只尝试读取该表。它们必须等待元数据锁。看到了吗https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html 了解更多信息。
表锁的缺点是它们会锁定整个表。无法使用此选项锁定单个行或行集。
另一种解决方案是,必须对所有读取进行编码,以要求共享锁:

SELECT ... FROM employee WHERE ... LOCK IN SHARE MODE;

mysql 8.0更改了语法,但工作方式相同:

SELECT ... FROM employee WHERE ... FOR SHARE;

这些不是元数据锁,而是行锁。因此可以锁定单个行或行集。
对某些行的共享锁的请求不会与这些行上的其他共享锁冲突,但如果这些行上有独占锁,则select for share将等待。反之亦然——如果在未提交的行上有任何select for share,则独占锁的请求将等待。
这种方法的缺点是,只有当所有读取该表的查询都具有for share选项时,它才能工作。
尽管如此,我发布这篇文章只是为了直接回答你的问题。我确实认为perkilis的回答中描述的系统是好的。我最近实现了一个这样的系统,而且很有效。
有时,您所考虑的实现并不是最佳解决方案,您需要考虑另一种方法来解决问题。

qltillow

qltillow2#

-- In a transaction by itself:
UPDATE t
    SET who_has = $me   -- some indicate of the process locking the rows
    WHERE who_has IS NULL
    LIMIT 100;

-- Grab some or all rows that you have and process them.
-- You should not need to lock them further (at least not for queue management)
SELECT ... WHERE who_has = $me ...

-- Eventually, release them, either one at a time, or all at once.
-- Here's the bulk release:
UPDATE t SET who_has = NULL
    WHERE who_has = $me
-- Again, this UPDATE is in its own transaction.

请注意,这种通用机制对“处理”项所需的时间没有限制。
另外,额外的 who_has 列可以帮助您在发生崩溃时不释放项目。它应该由抓取项目的时间戳来扩充。cron作业(或等效作业)应该四处查找任何被锁定“太长”的未处理项。

ars1skjm

ars1skjm3#

您可以在表中添加一个新列,例如,一个名为'processed'(布尔字段)的列,并用false值更新所有记录

update EMPLOYEE set processed = 0;

当一个进程启动时,在同一事务中,您可以选择更新,然后在这100行中将处理的列更新为1。

Query query = entityManager.createNativeQuery("select * from employee " +
            "where exported = 0 and processed = 0
    order by internal_employee_id desc  limit 100 for update");
        List<Employee> employeeListLocked = query.getResultList();

更新这100行

UPDATE EMPLOYEE eUpdate INNER JOIN (select internal_employee_id
       from EMPLOYEE where exported = 0 and processed = 0
       order by internal_employee_id desc limit 100) e
     ON eUpdate.internal_employee_id = e.internal_employee_id
       SET eUpdate.processed = 1 ;

然后,下一个进程将不处理相同的列表

相关问题