我写了一个程序来消耗Kafka数据,数据量达到1000,或者每30秒调用一次下游接口进行批量推送,结果显示数据会丢失或者有重复数据
Kafka列斯泰纳:
@KafkaListener(topics = {"#{'${kafka.consumer.topics}'.split(',')}"})
public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
switch (record.topic()) {
case "touty_engine_action_coupon_json":
couponListener.exeKafka(record.value().toString(), acknowledgment);
break;
case "touty_engine_action_autoTag_json":
autoTagListener.exeKafka(record.value().toString(), acknowledgment);
break;
}
}
耦合侦听器
private static Map<String, List<CouponEntity>> entityMap = new HashMap<>();
synchronized public void exeKafka(String msg, Acknowledgment acknowledgment) {
JSONObject object = JSON.parseObject(msg);
JSONObject idObj = JSON.parseObject(object.get("id").toString());
JSONObject settingsObj = JSON.parseObject(object.get("settings").toString());
JSONObject contextObj = JSON.parseObject(object.get("context").toString());
JSONObject mediaObj = JSON.parseObject(settingsObj.get("media").toString());
try {
if (null != contextObj.get("mobile") && StringUtils.isNotBlank(contextObj.get("mobile").toString())) {
CouponEntity entity = new CouponEntity();
//entity.set...
String batchKey = entity.getFlowId() + entity.getNodeKey() + entity.getCouponId();
if (entityMap.containsKey(batchKey)) {
List<CouponEntity> list = entityMap.get(batchKey);
if (list.size() >= 1000) {
sendPostRequest(list);
list.clear();
} else {
list.add(entity);
}
entityMap.put(batchKey, list);
} else {
List<CouponEntity> list = new ArrayList<>();
list.add(entity);
entityMap.put(batchKey, list);
}
} else {
logger.warn("userId:{},Mobile is null", contextObj.get("custNo").toString());
}
} catch (Exception e) {
logger.error("coupon error", e);
} finally {
acknowledgment.acknowledge();
}
}
couponlistener,定时发送方法
@Scheduled(cron = "0/30 * * * * ?")
public void timerSend() {
if (entityMap.size() > 0) {
Map<String, List<CouponEntity>> map = new HashMap<>();
synchronized (entityMap) {
map.putAll(entityMap);
entityMap.clear();
}
for (String key : map.keySet()) {
List<CouponEntity> list = map.get(key);
logger.info("sendPostRequest 30s,key:{},size:{}", key, list.size());
sendPostRequest(list);
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!