ApacheKafka:在版本0.10中只有一次

bfhwhh0e  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(297)

为了实现kafka消费者对消息的一次处理,我一次提交一条消息,如下所示

public void commitOneRecordConsumer(long seconds) {
        KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();

        try {

            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {

                        processingService.process(record);

                        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));

                        System.out.println("Committed Offset" + ": " + record.offset());

                    }
                } catch (CommitFailedException e) {
                    // application specific failure handling
                }
            }
        } finally {
            consumer.close();
        }
    }

上面的代码将消息的处理异步地委托给下面的另一个类。

@Service
public class ProcessingService {

    @Async
    public void process(ConsumerRecord<String, String> record) throws InterruptedException {
        Thread.sleep(5000L);
        Map<String, Object> map = new HashMap<>();
        map.put("partition", record.partition());
        map.put("offset", record.offset());
        map.put("value", record.value());
        System.out.println("Processed" + ": " + map);
    }

}

但是,这仍然不能保证只传递一次,因为如果处理失败,它可能仍然提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?

yhxst69z

yhxst69z1#

我认为Kafka0.10.x本身可以实现一次处理。但有一些陷阱。我在分享这本书的高层次理念。相关内容见: Seek and Exactly Once Processing 第四章: Kafka Consumers - Reading Data from Kafka . 你可以用一个(免费的)safaribooksonline帐户查看那本书的内容,或者在它发行后购买,或者从其他渠道获得它,我们将不谈这些。
想法:
想想这个常见的场景:应用程序从kafka读取事件,处理数据,然后将结果存储在数据库中。假设我们真的不想丢失任何数据,也不想在数据库中存储相同的结果两次。
如果有一种方法可以在一个原子操作中同时存储记录和偏移量,那么这是可行的。要么记录和偏移量都提交,要么两者都不提交。为了实现这一点,我们需要在一个事务中将记录和偏移量写入数据库。然后我们就知道要么我们已经处理完记录,并且提交了偏移量,要么我们还没有提交,记录将被重新处理。
现在唯一的问题是:如果记录存储在数据库中而不是kafka中,那么当它被分配一个分区时,我们的消费者如何知道从哪里开始读取?这到底是什么 seek() 可用于。当使用者启动或分配新分区时,它可以在数据库中查找偏移量并 seek() 去那个地方。
书中的示例代码:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        commitDBTransaction(); 
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition partition: partitions)
        consumer.seek(partition, getOffsetFromDB(partition)); 
    }
}

consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);

for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        processRecord(record);
        storeRecordInDB(record);
        storeOffsetInDB(record.topic(), record.partition(), record.offset()); 
    }
    commitDBTransaction();
}
o0lyfsai

o0lyfsai2#

apachekafka0.11.0.0刚刚发布,现在支持一次交付。
http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics
https://cwiki.apache.org/confluence/display/kafka/kip-98+-+exactly+once+delivery+and+transactional+messaging

62o28rlo

62o28rlo3#

0.10.2及更高版本的原始答案(对于0.11及更高版本,请参阅答案)
目前,Kafka无法提供一次即用的处理。如果在成功处理消息之后提交消息,则可以至少进行一次处理;如果在成功处理消息之后直接提交消息,则最多可以进行一次处理 poll() 在你开始处理之前。
(另见本节“交货保证”一段。)http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-提交)
但是,如果您的处理是幂等的,则至少有一次保证是“足够好的”,即即使您处理一个记录两次,最终结果也将是相同的。幂等处理的例子是向键值存储添加消息。即使添加两次相同的记录,第二次插入也只会替换当前的第一个键值对,kv存储区中仍然包含正确的数据。
在上面的示例代码中,您更新了 HashMap 这将是一个幂等运算。即使在失败的情况下您可能有不一致的状态,例如只有两个 put 调用在崩溃前执行。但是,这种不一致的状态将在重新处理同一记录时修复。
呼叫 println() 但不是幂等的,因为这是一个具有“副作用”的操作。但我想打印出来只是为了调试。
另一种选择是,您需要在用户代码中实现事务语义,在出现故障时需要“撤消”(部分执行)操作。总的来说,这是一个难题。
apachekafka 0.11+的更新(对于0.11之前的版本,请参阅上面的答案)
自0.11以来,apachekafka支持幂等生产者、事务生产者和使用kafka流的一次处理。它还增加了 "read_committed" 模式,以便使用者仅读取已提交的消息(以及删除/筛选已中止的消息)。
https://kafka.apache.org/documentation/#semantics
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://www.confluent.io/blog/transactions-apache-kafka/
https://www.confluent.io/blog/enabling-exactly-kafka-streams/

相关问题