NodeJS 我无法使用节点调度在已调度的cron作业中调度作业

qc6wkl3g  于 2023-05-28  发布在  Node.js
关注(0)|答案(1)|浏览(192)

我有一个cron工作,每5秒运行一次。每次它运行时,我都想做一些检查,并根据此安排,在特定时间安排一个新的作业,如果该作业之前没有安排。为此,我创建了一个包含以下代码的worker:

scheduleJob("worloads cron", "*/5 * * * * *", async function () {
  console.log("This job runs every 5 seconds");

  if (!connection) { // create DB connection if it doesn't exists
    Logger.info("Creating workloads DB connection.");
    connection = await createConnection({
      type: "postgres",
      url: process.env.DATABASE_URL,
      logging: true, // todo: disable in production
    });
    return Promise.resolve();
  } else if (connection && connection.isConnected) {

    // Fetch some data that needs to be scheduled
    const data: Workload[] = await connection.query(`
       SELECT workload."status"
       from "workload" workload;';
  `);

    console.log("jobs", scheduledJobs);

    for (const workload of data) {
      if (scheduledJobs[String(workload.id)]) {
        console.log(`Job with id ${workload.id} exists!`);
      } else {
        Logger.info(
          `Job with id ${workload.id} does not exist. Creating new scheduled job!`
        );
        //after this point my code doesn't execute and the job is not being scheduled
        const filter = await InteractionFilter.findOne({
          where: { id: workload.interactionFilter },
        });

        Logger.info("FILTER", filter); // this is no being logged
        const qb = await connection
          .getRepository(Interaction)
          .createQueryBuilder("interaction")
          .leftJoinAndSelect(
            `interaction.${filter.integrationSystem.toLowerCase()}_metadata`,
            "metadata"
          )
          .where("interaction.system = :system", {
            system: filter.integrationSystem,
          })
          .orderBy("interaction.createdAt", "ASC")
          .getMany();

        const filter_values = await connection
          .getRepository(InteractionFilter_Metadata)
          .createQueryBuilder("filtervalue")
          .leftJoinAndSelect("filtervalue.metadata", "metadata")
          .where("filtervalue.filterId = :filterId", {
            filterId: workload.interactionFilter,
          })
          .distinct()
          .getMany();

        Logger.info("FILTER VALUES", filter_values);

        // console.log(qb);

        scheduleJob(
          String(workload.id),
          dayjs(workload.executeTime).toDate(),
          function () {
            // todo: here create the interactions
            console.log("TEST");
          }
        );
        console.log("SCHEDULE TASK AFTER", scheduledJobs);
      }
    }

    return Promise.resolve();
  }
});

expose(() => true);

这是我的输出,正如你所看到的,在我们检查作业没有被调度之后,函数没有执行代码,而是返回到开始。

INFO [12-05-2023 16:22:40]: Creating workloads DB connection.
This job runs every 3 seconds
query: 
       SELECT workload."status"
       from "workload" workload;

jobs {
  'worloads crong': Job {

  }
}
INFO [12-05-2023 16:22:50]: Job with id 53 does not exist. Creating new scheduled job!

// HERE SINCE THE JOB DOESN'T EXIST IT SHOULD BE SCHEDULED INSTEAD I GET THE FOLLOWING OUTPUT:

This job runs every 3 seconds
query: 
       SELECT workload."status"
       from "workload" workload;

jobs {
  'worloads crong': Job {
    ...
  }
}
INFO [12-05-2023 16:23:00]: Job with id 53 does not exist. Creating new scheduled job!

我唯一能够让它工作的方法是通过评论所有的承诺这样:

scheduleJob("worloads crong", "*/10 * * * * *", async function () {
  console.log("This job runs every 3 seconds");
  if (!connection) {
    Logger.info("Creating workloads DB connection.");
    connection = await createConnection({
      type: "postgres",
      url: process.env.DATABASE_URL,
      logging: true, 
    });
    return Promise.resolve();
  } else if (connection && connection.isConnected) {
            const data: Workload[] = await connection.query(`
       SELECT workload."status"
       from "workload" workload;';
  `);

    console.log("jobs", scheduledJobs);

    for (const workload of data) {
      if (scheduledJobs[String(workload.id)]) {
        console.log(`Job with id ${workload.id} exists!`);
      } else {
        Logger.info(
          `Job with id ${workload.id} does not exist. Creating new scheduled job!`
        );

        scheduleJob(
          String(workload.id),
          dayjs(workload.executeTime).toDate(),
          function () {
            // todo: here create the interactions
            console.log("TEST");
          }
        );
        console.log("SCHEDULE TASK AFTER", scheduledJobs);
      }
    }

    return Promise.resolve();
  }
});

现在我得到了我想要的输出:

INFO [12-05-2023 16:27:40]: Creating workloads DB connection.
    This job runs every 3 seconds
    query: 
           SELECT workload."status"
           from "workload" workload;
    
    jobs {
      'worloads crong': Job {
      }
    }
    SCHEDULE TASK AFTER {
      '53': Job {
...
      },
      'worloads crong': Job {
...      
}
    }
    INFO [12-05-2023 16:27:50]: Job with id 53 does not exist. Creating new scheduled job!
    This job runs every 3 seconds
        query: 
           SELECT workload."status"
           from "workload" workload;
    jobs {
      '53': Job {
       ...
      },
      'worloads crong': Job {
       ...
      }
    }
    Job with id 53 exists!

有人有办法解决这个问题吗?

sirbozc5

sirbozc51#

很难说到底出了什么问题,但我可以为你的调查提出建议。
您所指示的promise未运行可能是错误的。我建议你将异步操作 Package 在try-catch中,以便能够捕获任何可能弹出的错误。一般来说,优雅地处理错误是一个很好的实践。
附带说明,要小心短超时的计划作业(如您正在使用的5),特别是当您有异步操作时,它可能会滞后于其他快速操作。

相关问题