spring引导使用kafka批量推送、脏读和丢失计数问题

pqwbnv8z  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(400)

我写了一个程序来消耗Kafka数据,数据量达到1000,或者每30秒调用一次下游接口进行批量推送,结果显示数据会丢失或者有重复数据
Kafka列斯泰纳:

@KafkaListener(topics = {"#{'${kafka.consumer.topics}'.split(',')}"})
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        switch (record.topic()) {
            case "touty_engine_action_coupon_json":
                couponListener.exeKafka(record.value().toString(), acknowledgment);
                break;
            case "touty_engine_action_autoTag_json":
                autoTagListener.exeKafka(record.value().toString(), acknowledgment);
                break;
        }
    }

耦合侦听器

private static Map<String, List<CouponEntity>> entityMap = new HashMap<>();

    synchronized public void exeKafka(String msg, Acknowledgment acknowledgment) {

        JSONObject object = JSON.parseObject(msg);
        JSONObject idObj = JSON.parseObject(object.get("id").toString());
        JSONObject settingsObj = JSON.parseObject(object.get("settings").toString());
        JSONObject contextObj = JSON.parseObject(object.get("context").toString());
        JSONObject mediaObj = JSON.parseObject(settingsObj.get("media").toString());

        try {
            if (null != contextObj.get("mobile") && StringUtils.isNotBlank(contextObj.get("mobile").toString())) {
                CouponEntity entity = new CouponEntity();
                //entity.set...
                String batchKey = entity.getFlowId() + entity.getNodeKey() + entity.getCouponId();
                if (entityMap.containsKey(batchKey)) {
                    List<CouponEntity> list = entityMap.get(batchKey);
                    if (list.size() >= 1000) {
                        sendPostRequest(list);
                        list.clear();
                    } else {
                        list.add(entity);
                    }
                    entityMap.put(batchKey, list);
                } else {
                    List<CouponEntity> list = new ArrayList<>();
                    list.add(entity);
                    entityMap.put(batchKey, list);
                }
            } else {
                logger.warn("userId:{},Mobile is null", contextObj.get("custNo").toString());
            }
        } catch (Exception e) {
            logger.error("coupon error", e);
        } finally {
            acknowledgment.acknowledge();
        }
    }

couponlistener,定时发送方法

@Scheduled(cron = "0/30 * * * * ?")
public void timerSend() {
    if (entityMap.size() > 0) {
        Map<String, List<CouponEntity>> map = new HashMap<>();
        synchronized (entityMap) {
            map.putAll(entityMap);
            entityMap.clear();
        }
        for (String key : map.keySet()) {
            List<CouponEntity> list = map.get(key);
            logger.info("sendPostRequest 30s,key:{},size:{}", key, list.size());
            sendPostRequest(list);
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题