在上一个作业完成之前不处理下一个作业(redis?)

rqqzpn5f  于 2021-06-10  发布在  Redis
关注(0)|答案(3)|浏览(381)

基本上,每个客户都有一个 clientId 与它们相关联的消息---可以推送消息,重要的是,在第一条消息完成处理之前,不会处理来自同一客户端的第二条消息(即使客户端可以连续发送多条消息,并且它们是有序的,并且发送消息的多个客户端最好不要相互干扰)。而且,重要的是,一份工作不应该被处理两次。
我认为使用redis可能可以解决这个问题,我开始使用bull库进行一些快速原型设计,但我显然做得不好,我希望有人知道如何继续。

这就是我目前所尝试的:

使用 clientId 作为作业名称。
在两个独立进程上等待大量随机时间时消耗作业。
我尝试添加我正在使用的库提供的默认锁定( bull )但是它锁定jobid,jobid对于每个作业都是唯一的,而不是clientid。

我希望发生什么:

其中一个消费者不能接受同样的工作 clientId 直到上一个完成处理。
但是,他们应该能够从不同的服务器获取项目 clientId 在没有问题的情况下(异步地)并行(我还没走到这一步,我现在只处理一个 clientId )

我得到的是:

两个消费者都从队列中消费尽可能多的项目,而不必等待队列中的前一个项目 clientId 待完成。

redis是这个工作的合适工具吗?

示例代码

// ./setup.ts
import Queue from 'bull';
import * as uuid from 'uuid';

// Check that when a message is taken from a place, no other message is taken

// TO do that test, have two processes that process messages and one that sets messages, and make the job take a long time

// queue for each room https://stackoverflow.com/questions/54178462/how-does-redis-pubsub-subscribe-mechanism-works/54243792#54243792
// https://groups.google.com/forum/#!topic/redis-db/R09u__3Jzfk

// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353

export async function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
}
export interface JobData {
  id: string;
  v: number;
}
export const queue = new Queue<JobData>('messages', 'redis://127.0.0.1:6379');

queue.on('error', (err) => {
  console.error('Uncaught error on queue.', err);
  process.exit(1);
});

export function clientId(): string {
  return uuid.v4();
}

export function randomWait(minms: number, maxms: number): Promise<void> {
  const ms = Math.random() * (maxms - minms) + minms;
  return sleep(ms);
}

// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
queue.LOCK_RENEW_TIME = 5 * 60 * 1000;
// ./create.ts
import { queue, randomWait } from './setup';

const MIN_WAIT = 300;
const MAX_WAIT = 1500;
async function createJobs(n = 10): Promise<void> {
  await randomWait(MIN_WAIT, MAX_WAIT);
  // always same Id
  const clientId = Math.random() > 1 ? 'zero' : 'one';
  for (let index = 0; index < n; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    const job = { id: clientId, v: index };
    await queue.add(clientId, job).catch(console.error);
    console.log('Added job', job);
  }
}

export async function create(nIds = 10, nItems = 10): Promise<void> {
  const jobs = [];
  await randomWait(MIN_WAIT, MAX_WAIT);
  for (let index = 0; index < nIds; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    jobs.push(createJobs(nItems));
    await randomWait(MIN_WAIT, MAX_WAIT);
  }
  await randomWait(MIN_WAIT, MAX_WAIT);
  await Promise.all(jobs)
  process.exit();
}

(function mainCreate(): void {
  create().catch((err) => {
    console.error(err);
    process.exit(1);
  });
})();
// ./consume.ts
import { queue, randomWait, clientId } from './setup';

function startProcessor(minWait = 5000, maxWait = 10000): void {
  queue
    .process('*', 100, async (job) => {
      console.log('LOCKING: ', job.lockKey());
      await job.takeLock();
      const name = job.name;
      const processingId = clientId().split('-', 1)[0];
      try {
        console.log('START: ', processingId, '\tjobName:', name);
        await randomWait(minWait, maxWait);
        const data = job.data;
        console.log('PROCESSING: ', processingId, '\tjobName:', name, '\tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('PROCESSED: ', processingId, '\tjobName:', name, '\tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('FINISHED: ', processingId, '\tjobName:', name, '\tdata:', data);
      } catch (err) {
        console.error(err);
      } finally {
        await job.releaseLock();
      }
    })
    .catch(console.error); // Catches initialization
}

startProcessor();

这是使用3个不同的进程运行的,您可以这样调用它们(尽管我使用不同的选项卡来更清楚地查看正在发生的事情)

npx ts-node consume.ts & 
npx ts-node consume.ts &
npx ts-node create.ts &
7ajki6be

7ajki6be1#

我不熟悉node.js。但对于redis,我会试试这个,
假设你有客户端1,客户端2,它们都是事件的发布者。你有三台机器,消费者1,消费者2,消费者3。
在redis中建立一个任务列表,例如job\u list。
客户机以特定的形式将(lpush)作业放入此作业列表,如“客户机1:[jobcontent]”、“客户机2:[jobcontent]”
每个使用者都会阻塞地取出作业(redis的rpop命令)并进行处理。例如,consumer\ u 1取出一个作业,content是client\ u 1:[jobcontent]。它解析内容并识别它来自客户端1。然后它想检查是否有其他消费者已经在处理客户机1,如果没有,它将锁定密钥以指示它正在处理客户机1。
它继续使用redis setnx命令(如果密钥不存在则设置)设置“client\u 1\u processing”的密钥,内容为“consumer\u 1”,并具有适当的超时。例如,任务通常需要一分钟才能完成,您将密钥的超时设置为五分钟,以防consumer\u 1崩溃并无限期地保持锁。
如果setnx返回0,则表示无法获取client_1的锁(有人正在处理client_1的作业)。然后它使用redis lpush命令将作业(值为“client\u 1:[jobcontent]”)返回到作业列表的左侧。然后它可能会等待一段时间(睡眠几秒钟),然后从列表的右侧rpop另一个任务。如果此时setnx返回1,则consumer\ 1获取锁。它继续处理作业,完成后删除“client\u 1\u processing”的密钥,释放锁。然后它继续rpop另一个作业,以此类推。
需要考虑的事项:
工作清单是不公平的,例如,以前的工作可能会在以后处理
锁紧部分有点简陋,但已经足够了。

piv4azn7

piv4azn73#


我想出了另一种方法来让任务井然有序。
为每个客户(生产者)建立一个列表。像“客户机列表”一样,将作业推到列表的左侧。将所有客户端名称保存在“client\u names\u list”列表中,值为“client\u 1”、“client\u 2”等。
对于每个客户机(处理器),迭代“客户机名称\u列表”,例如,客户机\u 1得到一个“客户机\u 1”,检查客户机\u 1的密钥是否被锁定(有人已经在处理客户机\u 1的任务),如果没有,右键弹出客户机\u 1列表中的一个值(作业),并锁定客户机\u 1。如果客户机\1被锁定,(可能会休眠一秒钟),然后迭代到下一个客户机,例如“客户机\2”,并检查密钥等等。
这样,每个客户机(任务生产者)的任务都按其输入顺序进行处理。

相关问题