java—如何根据标识符/消息类型在两个不同的服务上调用通用工作流?

sqxo8psd  于 2021-06-30  发布在  Java
关注(0)|答案(0)|浏览(236)

我有两个服务定义如下

public class RuleService{
public ServiceError validate(MyRuleInput input){...}
public void persist(MyRuleInput input){...}
}

public class ResultService{
public ServiceError validate(MyResultInput input){...}
public void persist(MyResultInput input){...}
}

现在我有一个主控制器类,它接收 String 它被翻译成下面类的json。

public class MyDomainWrapper{
String messageType;
MyRuleInput ruleInput;
MyResultInput resultInput;
}

如果 messageType=="RULE" 然后我必须触发一系列任务,如下所示。

MyRuleInput ruleInput = domainWrapper.getruleInput();
CompletableFuture.supplyAsync({}->RuleService.validate(ruleInput), executor).thenApplyAsync(()->RuleService.persist(ruleInput));

如果 messageType=="RESULT_SUMMARY" 然后按逻辑触发。

MyRresultInput resultInput = domainWrapper.getresultInput();
CompletableFuture.supplyAsync({}->ResultService.validate(resultInput), executor).thenApplyAsync(()->ResultService.persist(resultInput));

在这里我们可以看到任务链是常见的 validate --> persist 但要使用基于 messageType . 除了任务链之外,我还有一些错误处理逻辑,这在日志记录等服务之间很常见。
减少代码重复的最佳方法是什么?
提供了下面硬编码的实际类。

@Service
@Slf4j
public class DataHealthExecutor /*extends EntityService*/ {

    private ObjectMapper mapper;
    private DataHealthRuleService ruleService;
    private DataHealthRuleResultService resultService;

    @Autowired
    public DataHealthExecutor(@Qualifier("objectMapper") ObjectMapper mapper, DataHealthRuleService ruleService, DataHealthRuleResultService resultService) {
        this.mapper=mapper;
        this.ruleService = ruleService;
        this.resultService = resultService;
    }

    //@Override
    public Boolean processMessage(String applicationNm, String datasourceNm, String message) throws ExecutionException, InterruptedException {
        CompletableFuture<Boolean> response = null;
        try{
            DataHealthResultSummaryWrapper ruleBObj = mapper.readValue(JsonSanitizer.sanitize(message), new TypeReference<DataHealthResultSummaryWrapper>() {
            });
            DataHealthMessageType messageType = ruleBObj.getMessageType();
            if(messageType.equals(DataHealthMessageType.RULE)){
                SummaryRule rulePayload = ruleBObj.getSummaryRule();
                log.info("INFO: Received {} payload from {} {} for rule: {}...!",message,applicationNm, datasourceNm,rulePayload.getName());
                response = CompletableFuture.supplyAsync(() -> ruleService.validateRule(rulePayload, applicationNm, datasourceNm))
                                 .thenApplyAsync(ruleError -> {
                                     if (Objects.isNull(ruleError)) {
                                         try {
                                             ruleService.persistRules(rulePayload, applicationNm, datasourceNm);
                                             return Boolean.TRUE;
                                         } catch (DQPersistenceException | ParseException e) {
                                             e.printStackTrace();
                                             log.error("ERROR: Unable to persist {} {} {} payload for rule: {}", applicationNm, datasourceNm, message, rulePayload.getName());
                                             log.error("ERROR: PayloadType: {}. Failed payload message: {}", messageType, message);
                                             return Boolean.FALSE;
                                         }
                                     } else {
                                         log.error("ERROR: Rule {} is having errors. Rule validation errors: {}", rulePayload.getName(), ruleError.toString());
                                         return Boolean.TRUE;
                                     }
                                 });
            }
            else if(messageType.equals(DataHealthMessageType.RESULT_SUMMARY)){
                ResultSummary resultSummaryPayload = ruleBObj.getResultSummary();
                log.info("INFO: Received {} payload from {} {} for ruleId: {}...!",message,applicationNm, datasourceNm,resultSummaryPayload.getRuleId());
                response = CompletableFuture.supplyAsync(() -> resultService.validateRuleResultSummary(resultSummaryPayload,applicationNm, datasourceNm))
                        .thenApplyAsync(resultSummaryError -> {
                            if (Objects.isNull(resultSummaryError)) {
                                try {
                                    Map<String, String> resultResponse = resultService.persistRuleResultSummary(resultSummaryPayload, applicationNm, datasourceNm);
                                    log.info("INFO: {} {} {} persisted with response: {}", applicationNm, datasourceNm, messageType, resultResponse.get("status"));
                                    return Boolean.TRUE;
                                } catch (DQPersistenceException | ParseException e) {
                                    e.printStackTrace();
                                    log.error("ERROR: Unable to persist {} {} {} payload for ruleId: {}", applicationNm, datasourceNm, message, resultSummaryPayload.getRuleId());
                                    log.error("ERROR: PayloadType: {}. Failed payload message: {}", messageType, message);
                                    return Boolean.FALSE;
                                }
                            } else {
                                log.error("ERROR: ResultSummary message for ruleId {} is having errors. ResultSummary validation errors: {}", resultSummaryPayload.getRuleId(), resultSummaryError.toString());
                                return Boolean.TRUE;
                            }
                        });
            }
            else if(messageType.equals(DataHealthMessageType.RESULT)){
                /**DO NOTHING. For future release**/
                log.error("ERROR: Received {} messageType. No processing needed.", messageType.toString());
            }
            else{
                log.error("ERROR: Payload should be RULE, RESULT_SUMMARY or RESULT only. Invalid payload type {}.", messageType);
            }

        } catch(Exception e){
            log.error("ERROR: Unable to parse payload sent. Payload: {}", message);
        }

        return response.get();

    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题