基本上,每个客户都有一个 clientId
与它们相关联的消息---可以推送消息,重要的是,在第一条消息完成处理之前,不会处理来自同一客户端的第二条消息(即使客户端可以连续发送多条消息,并且它们是有序的,并且发送消息的多个客户端最好不要相互干扰)。而且,重要的是,一份工作不应该被处理两次。
我认为使用redis可能可以解决这个问题,我开始使用bull库进行一些快速原型设计,但我显然做得不好,我希望有人知道如何继续。
这就是我目前所尝试的:
使用 clientId
作为作业名称。
在两个独立进程上等待大量随机时间时消耗作业。
我尝试添加我正在使用的库提供的默认锁定( bull
)但是它锁定jobid,jobid对于每个作业都是唯一的,而不是clientid。
我希望发生什么:
其中一个消费者不能接受同样的工作 clientId
直到上一个完成处理。
但是,他们应该能够从不同的服务器获取项目 clientId
在没有问题的情况下(异步地)并行(我还没走到这一步,我现在只处理一个 clientId
)
我得到的是:
两个消费者都从队列中消费尽可能多的项目,而不必等待队列中的前一个项目 clientId
待完成。
redis是这个工作的合适工具吗?
示例代码
// ./setup.ts
import Queue from 'bull';
import * as uuid from 'uuid';
// Check that when a message is taken from a place, no other message is taken
// TO do that test, have two processes that process messages and one that sets messages, and make the job take a long time
// queue for each room https://stackoverflow.com/questions/54178462/how-does-redis-pubsub-subscribe-mechanism-works/54243792#54243792
// https://groups.google.com/forum/#!topic/redis-db/R09u__3Jzfk
// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
export async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
export interface JobData {
id: string;
v: number;
}
export const queue = new Queue<JobData>('messages', 'redis://127.0.0.1:6379');
queue.on('error', (err) => {
console.error('Uncaught error on queue.', err);
process.exit(1);
});
export function clientId(): string {
return uuid.v4();
}
export function randomWait(minms: number, maxms: number): Promise<void> {
const ms = Math.random() * (maxms - minms) + minms;
return sleep(ms);
}
// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
queue.LOCK_RENEW_TIME = 5 * 60 * 1000;
// ./create.ts
import { queue, randomWait } from './setup';
const MIN_WAIT = 300;
const MAX_WAIT = 1500;
async function createJobs(n = 10): Promise<void> {
await randomWait(MIN_WAIT, MAX_WAIT);
// always same Id
const clientId = Math.random() > 1 ? 'zero' : 'one';
for (let index = 0; index < n; index++) {
await randomWait(MIN_WAIT, MAX_WAIT);
const job = { id: clientId, v: index };
await queue.add(clientId, job).catch(console.error);
console.log('Added job', job);
}
}
export async function create(nIds = 10, nItems = 10): Promise<void> {
const jobs = [];
await randomWait(MIN_WAIT, MAX_WAIT);
for (let index = 0; index < nIds; index++) {
await randomWait(MIN_WAIT, MAX_WAIT);
jobs.push(createJobs(nItems));
await randomWait(MIN_WAIT, MAX_WAIT);
}
await randomWait(MIN_WAIT, MAX_WAIT);
await Promise.all(jobs)
process.exit();
}
(function mainCreate(): void {
create().catch((err) => {
console.error(err);
process.exit(1);
});
})();
// ./consume.ts
import { queue, randomWait, clientId } from './setup';
function startProcessor(minWait = 5000, maxWait = 10000): void {
queue
.process('*', 100, async (job) => {
console.log('LOCKING: ', job.lockKey());
await job.takeLock();
const name = job.name;
const processingId = clientId().split('-', 1)[0];
try {
console.log('START: ', processingId, '\tjobName:', name);
await randomWait(minWait, maxWait);
const data = job.data;
console.log('PROCESSING: ', processingId, '\tjobName:', name, '\tdata:', data);
await randomWait(minWait, maxWait);
console.log('PROCESSED: ', processingId, '\tjobName:', name, '\tdata:', data);
await randomWait(minWait, maxWait);
console.log('FINISHED: ', processingId, '\tjobName:', name, '\tdata:', data);
} catch (err) {
console.error(err);
} finally {
await job.releaseLock();
}
})
.catch(console.error); // Catches initialization
}
startProcessor();
这是使用3个不同的进程运行的,您可以这样调用它们(尽管我使用不同的选项卡来更清楚地查看正在发生的事情)
npx ts-node consume.ts &
npx ts-node consume.ts &
npx ts-node create.ts &
3条答案
按热度按时间7ajki6be1#
我不熟悉node.js。但对于redis,我会试试这个,
假设你有客户端1,客户端2,它们都是事件的发布者。你有三台机器,消费者1,消费者2,消费者3。
在redis中建立一个任务列表,例如job\u list。
客户机以特定的形式将(lpush)作业放入此作业列表,如“客户机1:[jobcontent]”、“客户机2:[jobcontent]”
每个使用者都会阻塞地取出作业(redis的rpop命令)并进行处理。例如,consumer\ u 1取出一个作业,content是client\ u 1:[jobcontent]。它解析内容并识别它来自客户端1。然后它想检查是否有其他消费者已经在处理客户机1,如果没有,它将锁定密钥以指示它正在处理客户机1。
它继续使用redis setnx命令(如果密钥不存在则设置)设置“client\u 1\u processing”的密钥,内容为“consumer\u 1”,并具有适当的超时。例如,任务通常需要一分钟才能完成,您将密钥的超时设置为五分钟,以防consumer\u 1崩溃并无限期地保持锁。
如果setnx返回0,则表示无法获取client_1的锁(有人正在处理client_1的作业)。然后它使用redis lpush命令将作业(值为“client\u 1:[jobcontent]”)返回到作业列表的左侧。然后它可能会等待一段时间(睡眠几秒钟),然后从列表的右侧rpop另一个任务。如果此时setnx返回1,则consumer\ 1获取锁。它继续处理作业,完成后删除“client\u 1\u processing”的密钥,释放锁。然后它继续rpop另一个作业,以此类推。
需要考虑的事项:
工作清单是不公平的,例如,以前的工作可能会在以后处理
锁紧部分有点简陋,但已经足够了。
kmpatx3s2#
更新
piv4azn73#
我想出了另一种方法来让任务井然有序。
为每个客户(生产者)建立一个列表。像“客户机列表”一样,将作业推到列表的左侧。将所有客户端名称保存在“client\u names\u list”列表中,值为“client\u 1”、“client\u 2”等。
对于每个客户机(处理器),迭代“客户机名称\u列表”,例如,客户机\u 1得到一个“客户机\u 1”,检查客户机\u 1的密钥是否被锁定(有人已经在处理客户机\u 1的任务),如果没有,右键弹出客户机\u 1列表中的一个值(作业),并锁定客户机\u 1。如果客户机\1被锁定,(可能会休眠一秒钟),然后迭代到下一个客户机,例如“客户机\2”,并检查密钥等等。
这样,每个客户机(任务生产者)的任务都按其输入顺序进行处理。