我有两个服务定义如下
public class RuleService{
public ServiceError validate(MyRuleInput input){...}
public void persist(MyRuleInput input){...}
}
public class ResultService{
public ServiceError validate(MyResultInput input){...}
public void persist(MyResultInput input){...}
}
现在我有一个主控制器类,它接收 String
它被翻译成下面类的json。
public class MyDomainWrapper{
String messageType;
MyRuleInput ruleInput;
MyResultInput resultInput;
}
如果 messageType=="RULE"
然后我必须触发一系列任务,如下所示。
MyRuleInput ruleInput = domainWrapper.getruleInput();
CompletableFuture.supplyAsync({}->RuleService.validate(ruleInput), executor).thenApplyAsync(()->RuleService.persist(ruleInput));
如果 messageType=="RESULT_SUMMARY"
然后按逻辑触发。
MyRresultInput resultInput = domainWrapper.getresultInput();
CompletableFuture.supplyAsync({}->ResultService.validate(resultInput), executor).thenApplyAsync(()->ResultService.persist(resultInput));
在这里我们可以看到任务链是常见的 validate --> persist
但要使用基于 messageType
. 除了任务链之外,我还有一些错误处理逻辑,这在日志记录等服务之间很常见。
减少代码重复的最佳方法是什么?
提供了下面硬编码的实际类。
@Service
@Slf4j
public class DataHealthExecutor /*extends EntityService*/ {
private ObjectMapper mapper;
private DataHealthRuleService ruleService;
private DataHealthRuleResultService resultService;
@Autowired
public DataHealthExecutor(@Qualifier("objectMapper") ObjectMapper mapper, DataHealthRuleService ruleService, DataHealthRuleResultService resultService) {
this.mapper=mapper;
this.ruleService = ruleService;
this.resultService = resultService;
}
//@Override
public Boolean processMessage(String applicationNm, String datasourceNm, String message) throws ExecutionException, InterruptedException {
CompletableFuture<Boolean> response = null;
try{
DataHealthResultSummaryWrapper ruleBObj = mapper.readValue(JsonSanitizer.sanitize(message), new TypeReference<DataHealthResultSummaryWrapper>() {
});
DataHealthMessageType messageType = ruleBObj.getMessageType();
if(messageType.equals(DataHealthMessageType.RULE)){
SummaryRule rulePayload = ruleBObj.getSummaryRule();
log.info("INFO: Received {} payload from {} {} for rule: {}...!",message,applicationNm, datasourceNm,rulePayload.getName());
response = CompletableFuture.supplyAsync(() -> ruleService.validateRule(rulePayload, applicationNm, datasourceNm))
.thenApplyAsync(ruleError -> {
if (Objects.isNull(ruleError)) {
try {
ruleService.persistRules(rulePayload, applicationNm, datasourceNm);
return Boolean.TRUE;
} catch (DQPersistenceException | ParseException e) {
e.printStackTrace();
log.error("ERROR: Unable to persist {} {} {} payload for rule: {}", applicationNm, datasourceNm, message, rulePayload.getName());
log.error("ERROR: PayloadType: {}. Failed payload message: {}", messageType, message);
return Boolean.FALSE;
}
} else {
log.error("ERROR: Rule {} is having errors. Rule validation errors: {}", rulePayload.getName(), ruleError.toString());
return Boolean.TRUE;
}
});
}
else if(messageType.equals(DataHealthMessageType.RESULT_SUMMARY)){
ResultSummary resultSummaryPayload = ruleBObj.getResultSummary();
log.info("INFO: Received {} payload from {} {} for ruleId: {}...!",message,applicationNm, datasourceNm,resultSummaryPayload.getRuleId());
response = CompletableFuture.supplyAsync(() -> resultService.validateRuleResultSummary(resultSummaryPayload,applicationNm, datasourceNm))
.thenApplyAsync(resultSummaryError -> {
if (Objects.isNull(resultSummaryError)) {
try {
Map<String, String> resultResponse = resultService.persistRuleResultSummary(resultSummaryPayload, applicationNm, datasourceNm);
log.info("INFO: {} {} {} persisted with response: {}", applicationNm, datasourceNm, messageType, resultResponse.get("status"));
return Boolean.TRUE;
} catch (DQPersistenceException | ParseException e) {
e.printStackTrace();
log.error("ERROR: Unable to persist {} {} {} payload for ruleId: {}", applicationNm, datasourceNm, message, resultSummaryPayload.getRuleId());
log.error("ERROR: PayloadType: {}. Failed payload message: {}", messageType, message);
return Boolean.FALSE;
}
} else {
log.error("ERROR: ResultSummary message for ruleId {} is having errors. ResultSummary validation errors: {}", resultSummaryPayload.getRuleId(), resultSummaryError.toString());
return Boolean.TRUE;
}
});
}
else if(messageType.equals(DataHealthMessageType.RESULT)){
/**DO NOTHING. For future release**/
log.error("ERROR: Received {} messageType. No processing needed.", messageType.toString());
}
else{
log.error("ERROR: Payload should be RULE, RESULT_SUMMARY or RESULT only. Invalid payload type {}.", messageType);
}
} catch(Exception e){
log.error("ERROR: Unable to parse payload sent. Payload: {}", message);
}
return response.get();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!