我有一个cron工作,每5秒运行一次。每次它运行时,我都想做一些检查,并根据此安排,在特定时间安排一个新的作业,如果该作业之前没有安排。为此,我创建了一个包含以下代码的worker:
scheduleJob("worloads cron", "*/5 * * * * *", async function () {
console.log("This job runs every 5 seconds");
if (!connection) { // create DB connection if it doesn't exists
Logger.info("Creating workloads DB connection.");
connection = await createConnection({
type: "postgres",
url: process.env.DATABASE_URL,
logging: true, // todo: disable in production
});
return Promise.resolve();
} else if (connection && connection.isConnected) {
// Fetch some data that needs to be scheduled
const data: Workload[] = await connection.query(`
SELECT workload."status"
from "workload" workload;';
`);
console.log("jobs", scheduledJobs);
for (const workload of data) {
if (scheduledJobs[String(workload.id)]) {
console.log(`Job with id ${workload.id} exists!`);
} else {
Logger.info(
`Job with id ${workload.id} does not exist. Creating new scheduled job!`
);
//after this point my code doesn't execute and the job is not being scheduled
const filter = await InteractionFilter.findOne({
where: { id: workload.interactionFilter },
});
Logger.info("FILTER", filter); // this is no being logged
const qb = await connection
.getRepository(Interaction)
.createQueryBuilder("interaction")
.leftJoinAndSelect(
`interaction.${filter.integrationSystem.toLowerCase()}_metadata`,
"metadata"
)
.where("interaction.system = :system", {
system: filter.integrationSystem,
})
.orderBy("interaction.createdAt", "ASC")
.getMany();
const filter_values = await connection
.getRepository(InteractionFilter_Metadata)
.createQueryBuilder("filtervalue")
.leftJoinAndSelect("filtervalue.metadata", "metadata")
.where("filtervalue.filterId = :filterId", {
filterId: workload.interactionFilter,
})
.distinct()
.getMany();
Logger.info("FILTER VALUES", filter_values);
// console.log(qb);
scheduleJob(
String(workload.id),
dayjs(workload.executeTime).toDate(),
function () {
// todo: here create the interactions
console.log("TEST");
}
);
console.log("SCHEDULE TASK AFTER", scheduledJobs);
}
}
return Promise.resolve();
}
});
expose(() => true);
这是我的输出,正如你所看到的,在我们检查作业没有被调度之后,函数没有执行代码,而是返回到开始。
INFO [12-05-2023 16:22:40]: Creating workloads DB connection.
This job runs every 3 seconds
query:
SELECT workload."status"
from "workload" workload;
jobs {
'worloads crong': Job {
}
}
INFO [12-05-2023 16:22:50]: Job with id 53 does not exist. Creating new scheduled job!
// HERE SINCE THE JOB DOESN'T EXIST IT SHOULD BE SCHEDULED INSTEAD I GET THE FOLLOWING OUTPUT:
This job runs every 3 seconds
query:
SELECT workload."status"
from "workload" workload;
jobs {
'worloads crong': Job {
...
}
}
INFO [12-05-2023 16:23:00]: Job with id 53 does not exist. Creating new scheduled job!
我唯一能够让它工作的方法是通过评论所有的承诺这样:
scheduleJob("worloads crong", "*/10 * * * * *", async function () {
console.log("This job runs every 3 seconds");
if (!connection) {
Logger.info("Creating workloads DB connection.");
connection = await createConnection({
type: "postgres",
url: process.env.DATABASE_URL,
logging: true,
});
return Promise.resolve();
} else if (connection && connection.isConnected) {
const data: Workload[] = await connection.query(`
SELECT workload."status"
from "workload" workload;';
`);
console.log("jobs", scheduledJobs);
for (const workload of data) {
if (scheduledJobs[String(workload.id)]) {
console.log(`Job with id ${workload.id} exists!`);
} else {
Logger.info(
`Job with id ${workload.id} does not exist. Creating new scheduled job!`
);
scheduleJob(
String(workload.id),
dayjs(workload.executeTime).toDate(),
function () {
// todo: here create the interactions
console.log("TEST");
}
);
console.log("SCHEDULE TASK AFTER", scheduledJobs);
}
}
return Promise.resolve();
}
});
现在我得到了我想要的输出:
INFO [12-05-2023 16:27:40]: Creating workloads DB connection.
This job runs every 3 seconds
query:
SELECT workload."status"
from "workload" workload;
jobs {
'worloads crong': Job {
}
}
SCHEDULE TASK AFTER {
'53': Job {
...
},
'worloads crong': Job {
...
}
}
INFO [12-05-2023 16:27:50]: Job with id 53 does not exist. Creating new scheduled job!
This job runs every 3 seconds
query:
SELECT workload."status"
from "workload" workload;
jobs {
'53': Job {
...
},
'worloads crong': Job {
...
}
}
Job with id 53 exists!
有人有办法解决这个问题吗?
1条答案
按热度按时间sirbozc51#
很难说到底出了什么问题,但我可以为你的调查提出建议。
您所指示的promise未运行可能是错误的。我建议你将异步操作 Package 在try-catch中,以便能够捕获任何可能弹出的错误。一般来说,优雅地处理错误是一个很好的实践。
附带说明,要小心短超时的计划作业(如您正在使用的5),特别是当您有异步操作时,它可能会滞后于其他快速操作。