只有一次使用Kafka

mfpqipee  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(195)

我正试图实现一个完全一次生产者在Kafka。我已经阅读了文档并理解了它,以便:
只要生产者是幂等的
事务是使用事务api的(使用init.transaction的两阶段事务。。。以此类推)
消费者的行为被设置为“read\u committed”
我只会得到一次。
这是我目前的代码。如果我执行这个代码,我的主题总是重复的。我知道我关闭了生产者,幂等的性质就丢失了。因此,我创建了更多的记录,这些记录是在同一个提交中发送的,只有一个生产者,但是结果是一样的,我做错了什么,我想我需要实现sendoffsetstotransactions(),但是我不确定。

public class Producer_test {
    public static void main(String[] args) {
        String bootstrapServers ="127.0.0.1:9092";
        String groupId="test_group";
        String topic="test_topic";

        // create Producer properties
        Properties properties= new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG,"all");
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"1");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
        properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG,"ClientId");
        properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"100");

        // create the producer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        // create a producer record
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "1");

        producer.initTransactions();
        try {
            producer.beginTransaction();
            producer.send(record);
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            producer.close();
        } catch (KafkaException e) {
            producer.abortTransaction();
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题