java 如何在Spring Batch RetryContext中设置值

bkhjykvo  于 2023-06-20  发布在  Java
关注(0)|答案(1)|浏览(105)

我有以下批处理作业的步骤定义:

@Autowired
  RetryContextCache retryContextCache;

  @Bean
  public Step correctionStep(JpaTransactionManager transactionManager) {

    return new StepBuilder("correction-step", jobRepository)
        .<String, List<BookingInfo>>chunk(10, transactionManager)
        .reader(customItemReader())
        .processor(correctionProcessor())
        .writer(customItemWriter)
        .taskExecutor(correctionTaskExecutor())
        .faultTolerant()
        .retryPolicy(retryPolicy())
        .backOffPolicy(exponentialBackOffPolicy())
        .retryContextCache(retryContextCache)
        .skipPolicy(skipPolicy())
        .build();
  }

我有RetryContextRetryContextCache的bean定义:

@Bean
  RetryContextCache retryContextCache() {
    return new MapRetryContextCache();
  }

  @Bean
  RetryContext retryContext() {
    return new RetryContextSupport(null);
  }

现在,我在processor中使用它们,如下所示:

@Component
public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {

  @Autowired
  RetryContextCache retryContextCache;

  @Autowired
  RetryContext retryContext;

  public List<BookingInfo> process(String bookingId) throws Exception {

    List<BookingInfo> list = new ArrayList <> ();

    if (retryContextCache.containsKey(bookingId)) {
      list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
    } else {
      // fetch and populate list from database.
    }

    try {
      // do something with the list.
    } catch (Exception e) {
      // modify something in the list.
      retryContext.setAttribute(bookingId, list);
      retryContextCache.put(bookingId, retryContext);
      throw e;
    }

  }
}

您可以看到,我尝试在retryContextCache中设置一些值,然后重新抛出异常,以使重试机制工作。
当重试发生时,它进入上面代码中提到的if条件。但是,retryContextCache.get(bookingId).getAttribute(bookingId)的值总是null
我在重试上下文中设置的值是否不正确?为什么不管用?

xuo3flqw

xuo3flqw1#

我是这样解决这个问题的。
我想在重试开始之前保存对象状态。RetryContext的作用域在抛出错误后立即启动。因此,我无法设置RetryContext中的值。
因此,我创建了一个带有StepScope的bean:

@Bean
@StepScope
public Map<String,BookingInfo> objectStateMap() {
 return new ConcurrentHashMap<>();
}

然后,我在需要的地方自动连接这个哈希。
然后,在catch块中重新抛出错误之前,我将修改后的对象写入此哈希。
然后,我为我的用例创建了一个SkipListener。这将在失败时捕获跳过的对象。然后,它将相应地完成各自的任务。

@Component
@Slf4j
public class CustomSkipListener {

  @Autowired
  Map<String, List<BookingInfo>> objectStateMap;

  @OnSkipInRead
  public void onSkipInRead(Throwable t) {
    log.error("Read has skipped because of error : " + t.getMessage());
  }

  @OnSkipInWrite
  public void onSkipInWrite(List<BookingInfo> item, Throwable t) {
    log.error("Write has skipped because of error : " + t.getMessage());
  }

  @OnSkipInProcess
  public void onSkipInProcess(String bookingId, Throwable t) {
    List<BookingInfo> bookingInfos = objectStateMap.get(bookingId);
    // do some tasks..
    objectStateMap.remove(bookingId);
  }

}

刚刚将此listener注册到主作业中。

相关问题