在mysql中使用并发worker进行sql原子读取和更新

eit6fx6z  于 2021-07-26  发布在  Java
关注(0)|答案(2)|浏览(345)

假设我有多个worker可以同时对一个mysql表进行读写(例如。 jobs ). 每个工人的任务是:
找到最老的 QUEUED 工作
将其状态设置为 RUNNING 返回相应的id。
请注意,可能没有任何限定(即。 QUEUED )工作人员运行步骤1时的作业。
到目前为止,我有以下伪代码。我想我需要取消( ROLLBACK )如果第1步没有返回任何作业,则返回事务。在下面的代码中我将如何做到这一点?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:

SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)

UPDATE jobs
SET status="RUNNING"

# HERE: Not sure how to make this conditional on the previous ID

# WHERE id = <ID from the previous SELECT>

COMMIT;
c9qzyr3d

c9qzyr3d1#

我这周正在实施一个和你的案子非常相似的计划。一组工人,每个人抓住一组行中的“下一行”进行工作。
伪代码是这样的:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

使用 FOR UPDATE 重要的是要避免比赛条件,即一个以上的工人试图抢同一排。
看到了吗https://dev.mysql.com/doc/refman/8.0/en/select-into.html 有关 SELECT ... INTO .

wgx48brx

wgx48brx2#

你想要什么还不太清楚。但假设你的任务是:找到下一个 QUEUED 工作。将其状态设置为 RUNNING 并选择相应的id。
在单线程环境中,只需使用代码即可。将所选id提取到应用程序代码中的变量中,并将其传递给where子句中的update查询。您甚至不需要事务,因为只有一个书面语句。您可以在sqlscript中进行模拟。
假设这是您当前的状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

您想启动下一个排队作业(id=2)。

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

你会得到

@id_for_update
2

从最后一个选择。表将具有以下状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

db fiddle视图
如果有多个启动作业的进程,则需要使用 FOR UPDATE . 但这可以避免使用 LAST_INSERT_ID() :
从上述状态开始,作业2已在运行:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

您将获得:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

新的状态是:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

db fiddle视图
如果update语句不影响任何行(没有排队的行) ROW_COUNT()0 .
可能会有一些风险,我不知道-但这也不是我真正将如何处理这一点。我宁愿把更多的信息储存在 jobs table。简单示例:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

现在一个正在运行的作业属于一个特定的进程,您只需使用

SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

你甚至想知道更多的信息。 queued_at , started_at , finished_at .

相关问题