我一直在为我们的项目广泛使用布尔库。几天前,我们在处理bull队列中的作业时发现问题。我们可以在bull队列中添加作业,但它无法处理该作业。当我签入taskforce时,我们注意到作业被延迟/失败,错误响应为空。我认为这是因为工作进程没有与队列连接。同样的代码可以正常工作,如果我尝试重新启动进程后,一些时间。我不确定这是否是redis或bull的问题,或者我使用它的方式。问题是我可以添加作业,但作业未处理(意外(仅在某些情况下)
示例代码段:
const BullQueue = require('bull');
class Queue {
constructor(name, connectionName = DBConstant.localRedisConnectionName) {
const options = {};
options.redis = DBConnectionUseCase.getRedisConnectionOptions(connectionName);
this._queue = new BullQueue(name, options);
this._connectionName = connectionName;
this.queueName = name;
}
async initProcessor() {
try {
//TODO: We have noticed that in some cases process does not get attached to Queue.
//We tried adding await here and check result but did not get anything. This needs to be debug.
this._queue.process((job, done) => {
this.process(job, done);
});
}catch(error) {
console.log(`Worker :: ${this.queueName} :: Exception in processor initialisation :: connectionName :: ${this._connectionName} :: Error :: ${error.message} :: ${JSON.stringify(error)}`);
}
console.log(`Worker :: ${this.queueName} :: processor initialised :: connectionName :: ${this._connectionName}`);
}
getQueueName() {
return this.queueName;
}
addJob(data, options) {
console.log(`Worker :: ${this.queueName} :: Job added in Queue :: ${this._connectionName}`);
return this._queue.add(data, options);
}
}
class FetchCustomerData extends Queue{
constructor() {
super(QUEUE_NAME);
}
/**
*
*/
static getInstance() {
if(!queueInstance) {
queueInstance = new FetchCustomerData();
}
return queueInstance;
}
/**
*
*/
initDefaultJob() {
const data = {};
const options = {
// This cron will run in everyday at 12:30AM ISE
repeat: {
cron: '0 20 * * *'
}
};
this.addJob(data, Object.assign({},options, constant.BULL_JOB_OPTIONS));
console.log(`Worker :: ${QUEUE_NAME} :: initial job added :: options :: ${JSON.stringify(Object.assign({},options, constant.BULL_JOB_OPTIONS))}`);
}
/**
*
*/
async process(job, done) {
try {
...
return done(nu;;);
}catch(error) {
console.error(`FetchActivationalUCC.process :: ${uuid} :: Exception :: ${JSON.stringify(error)}`);
return done(error);
}
}
}
我如何使用它:
FetchCustomerData.initProcessor();
const res = await FetchCustomerData.addJob({"key": 123});
Package 清单:
"bull": "3.7.0"
"ioredis": "4.9.0"
"node": v10.15.0
暂无答案!
目前还没有任何答案,快来回答吧!