我们正试图消费制作人的唱片。它现在有2个分区,但是将来可以增加它来提高我们的吞吐量。我们正试图消费记录与2消费者线程,但我们得到重复。我们的生产商说,他们也包括关键,但它仍然不能解决问题。不知道为什么?
但是,从用户端来看,由于重复,我们的整个流程周期被触发了两次,这是我们想要避免的。我们担心的是,如果将来增加分区,也会增加重复。
我们的流程周期:
从流获取记录-->基于键向上插入表-->基于键获取记录并将其插入表-->调用api并更新记录
日志:
coming from stream :004582777into offset 500405and partition 0
coming from stream :004582777into offset 499525and partition 1
skipping tax id 004582777
skipping tax id 004582777
coming from stream :002402419into offset 499526and partition 1
coming from stream :002402419into offset 500406and partition 0
skipping tax id 002402419
skipping tax id 002402419
coming from stream :010546936into offset 499527and partition 1
coming from stream :010546936into offset 500407and partition 0
skipping tax id 010546936
skipping tax id 010546936
coming from stream :010646378into offset 500408and partition 0
coming from stream :010646378into offset 499528and partition 1
skipping tax id 010646378
skipping tax id 010646378
coming from stream :010866219into offset 500409and partition 0
coming from stream :010866219into offset 499529and partition 1
skipping tax id 010866219
skipping tax id 010866219
coming from stream :019541747into offset 499530and partition 1
coming from stream :019541747into offset 500410and partition 0
skipping tax id 019541747
skipping tax id 019541747
coming from stream :020438119into offset 500411and partition 0
coming from stream :020438119into offset 499531and partition 1
skipping tax id 020438119
skipping tax id 020438119
coming from stream :020594385into offset 499532and partition 1
coming from stream :020594385into offset 500412and partition 0
skipping tax id 020594385
skipping tax id 020594385
coming from stream :043514479into offset 500413and partition 0
coming from stream :043514479into offset 499533and partition 1
skipping tax id 043514479
skipping tax id 043514479
coming from stream :030446242into offset 500414and partition 0
coming from stream :030446242into offset 499534and partition 1
record count is more than zero :1 for tax id:030446242 <--- we are calling API 2 times because of 2 ocurance
record count is more than zero :1 for tax id:030446242
即使我们从不同的分区得到了重复的数据,我们如何确保只选取这个记录的出现点呢?由于这两个记录都由2个使用者线程并行处理,因此对于某些记录,它将在表中捕获这两个示例,而对于某些记录,它仅捕获1个示例。
代码:
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
prov_tin_number = record.value().get("providerTinNumber").toString();
prov_tin_type = record.value().get("providerTINType").toString();
enroll_type = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
error_flag = "";
dataexecutor.peStremUpsertTbl(prov_tin_number, prov_tin_type, enroll_type, vcp_prov_choice_ind, error_flag,
record.partition(), record.offset());
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}
1条答案
按热度按时间umuewwlo1#
从不同分区获取副本
Kafka对数据一无所知;您将获得所有分区/偏移处的所有记录。
您可以添加
FilterStrategy
到侦听器容器工厂以筛选出重复项。https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#filtering-消息