kafka生产者故障转移机制和推送到主题的数据验证

uxhixvfz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(291)

我已经写了一个代码,每天将数据推送到Kafka主题,但有几个问题,我不确定这段代码是否能够处理。我的职责是从保存1天数据的活动表中推送完整的数据(每天早上刷新)
我的代码将查询“select*from mytable”并将它一个接一个地推送到kafka主题,就像推之前一样,我需要验证/更改每一行并推送到主题。
下面是我的制作人发送代码。

Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configProperties.put("acks", "all");
        configProperties.put("retries", 0);
        configProperties.put("batch.size", 15000);
        configProperties.put("linger.ms", 1);
        configProperties.put("buffer.memory", 30000000);
        @SuppressWarnings("resource")
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
        System.out.println("Starting Kafka producer job  " + new Date());
        producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                }
            }
        });

现在,我不知道如何在失败的情况下将数据再次推回到主题中。因为我已经从表中选择了所有的记录,但很少有失败的记录,我不知道是哪一个。
下面是我想说的
如何只处理那些没有被推送的记录,以避免重复记录被推送(避免冗余)。
如何验证推送的记录与表中的完全相同。我是说数据的完整性。比如数据的大小和被推送的记录数。

wlp8pajw

wlp8pajw1#

你可以用 configProperties.put("enable.idempotence", true); -它将尝试重试失败的消息,但确保每个记录中只有一条保存在kafka中。请注意,这意味着
retries>0 acks=all 以及 max.in.flight.requests.per.connection >=0. 详情请查收https://kafka.apache.org/documentation/.
对于第二个问题-如果你的意思是你需要保存所有记录或没有,那么你必须使用Kafka事务,这带来了更多的问题,我建议阅读https://www.confluent.io/blog/transactions-apache-kafka/

相关问题