java—如何将后台线程(t1)中抛出的异常通知给调度程序线程(t2)

vtwuwzda  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(364)

我有一个时间表,拿起定期的时间间隔的工作,每个工作需要很长的时间来处理。当一个作业正在处理时,我们发送heartbeat,它每5分钟更新一次后台线程上数据库中的最后一个更新时间列,直到作业完全完成。
问题:如果后台线程在发送心跳信号时抛出运行时异常,如何通知正在处理runjob方法中的作业的调度程序线程立即中止当前作业并移动到作业列表中的下一个作业?
jobrunner.class类

@Scheduled
    public void pipelineRunner() {
        List<Job> jobs = getJobs(); //fetches tasks from database
        for(Job job: jobs){
            try{
                heartBeatSender.startHeartBeat(job); // updates last_update_time column for the current job in database on background thread
                runJob(job); // contains logic of processing a job, how to abort current job  it is processing if heartbeat sender throws a runtime exception or how to notify or make scheduler thread throw exception so that it can move to next job in the list?
            }catch(Exception e){}
            finally{
                heartBeatSender.stopHeartBeat(); //stops updating last_update_time col.
            }
        }
    }

heartbeatsender.class类

public void startHeartBeat(Job job) {
        Runnable heartBeat = () -> {
            if (job != null) {
                job.setLastUpdateTime(System.currentTimeMillis());
                jobRepository.save(job);//runtime exception can be thrown here.
            }
        }
        executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(heartBeat, 0L, heartBeatInterval, TimeUnit.MILLISECONDS);  //heartBeatInterval every 5mins
    }
zdwk9cvp

zdwk9cvp1#

你发布的代码有点不一致,因为它看起来 HeartbeatSender 可以将心跳发送到多个作业,但客户端代码总是只触发一个。但是,如果您有另一个类也使用这个 HeartbeatSender ,您将计划两个心跳运行,那么它将如何决定哪一个 stopHeartbeat() 会取消吗?更不用说我也不知道它一开始是如何取消它的,因为它无法跟踪超出时间的计划任务 startHeartbeat() 打电话。
所以你要做的就是 HeartbeatSender 按作业跟踪它的心跳,这样就可以逐个取消它们。

class HeartbeatSender {
  private final Map<Job, ScheduledFuture<?>> heartbeats = new HashMap<>();
  // you can create a new executor in start, but I don't see why
  private final ScheduledExecutorService heartbeatExecutor = newSingleThreadScheduledExecutor();

  public void startHeartbeat(Job job) {
    Runnable heartBeat= ...;
    ScheduledFuture<?> schedule = executor.scheduleAtFixedRate(heartBeat, 0L, heartBeatInterval, TimeUnit.MILLISECONDS);
    heartbeats.put(job, schedule); // remember so you can cancel later
  }

  public void stopHeartbeat(Job job) {
    ScheduledFuture<?> schedule = heartbeats.remove(job);
    if (schedule != null) {
      schedule.cancel(false);
    }
  }
}

现在你可以通过作业来停止心跳,所以你只需要在客户端代码中这样做。

for(Job job: jobs){
  try {
    heartBeatSender.startHeartBeat(job);
    runJob(job);
  } catch(Exception e) {
  } finally{
    heartBeatSender.stopHeartBeat(job); // stops heartbeat for this job
  }
}

相关问题