如何从一开始就使用kafka consumer api读取数据?

kqhtkvqz  于 2021-06-08  发布在  Kafka
关注(0)|答案(10)|浏览(513)

请任何人告诉我如何阅读消息使用Kafka消费api从一开始,每次我运行消费者。

d4so4syb

d4so4syb1#

一种可能的解决方案是在订阅一个或多个主题时使用ConsumerBalanceListener的实现。ConsumerBalanceListener包含在分配新分区或从使用者中删除新分区时的回调方法。下面的代码示例说明了这一点:

public class SkillsConsumer {

private String topic;

private KafkaConsumer<String, String> consumer;

private static final int POLL_TIMEOUT = 5000;

public SkillsConsumer(String topic) {
    this.topic = topic;
    Properties properties = ConsumerUtil.getConsumerProperties();
    properties.put("group.id", "consumer-skills");
    this.consumer = new KafkaConsumer<>(properties);
    this.consumer.subscribe(Collections.singletonList(this.topic),
            new PartitionOffsetAssignerListener(this.consumer));
    }
}

public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {

private KafkaConsumer consumer;

public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
    this.consumer = kafkaConsumer;
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    //reading all partitions from the beginning
    for(TopicPartition partition : partitions)
        consumer.seekToBeginning(partition);
}

}
现在,无论何时将分区分配给使用者,都将从头开始读取每个分区。

zmeyuzjn

zmeyuzjn2#

如果您更具体地使用java consumer api org.apache.kafka.clients.consumer.consumer,可以尝试seek*方法。

consumer.seekToBeginning(consumer.assignment())

这里,consumer.assignment()返回分配给给定使用者的所有分区,seekToBegining将从给定分区集合的最早偏移量开始。

cwdobuhd

cwdobuhd3#

在使用高级使用者集时 props.put("auto.offset.reset", "smallest"); 在创造 ConsumerConfig

mbjcgjjk

mbjcgjjk4#

所以对我来说,有效的是上面建议的结合。关键的变化是包括

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

每次都有一个随机生成的组id。但这一点对我不起作用。出于某种原因,我第一次调查消费者时,他们没有任何记录。我必须破解它才能让它工作-

consumer.poll(0); // without this the below statement never got any records
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(100));

我是新来Kafka,不知道为什么会发生这种情况,但对于其他人仍然试图让这项工作,希望这有帮助。

hgtggwj0

hgtggwj05#

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

如果您只是避免保存任何补偿,消费者将始终在开始时重置。

0pizxfdo

0pizxfdo6#

总是从偏移量0读取而不每次创建新的groupid。

// ... Assuming the props have been set properly.
    // ... enable.auto.commit and auto.offset.reset as default

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(topic));
    consumer.poll(0);  // without this, the assignment will be empty. 
    consumer.assignment().forEach(t -> {
        System.out.printf("Set %s to offset 0%n", t.toString());
        consumer.seek(t, 0);
    });
    while (true) {
     // ... consumer polls messages as usual.
    }
ycggw6v2

ycggw6v27#

这样做的一个选择是每次启动时都有一个唯一的组id,这意味着kafka会从一开始就向您发送主题中的消息。在设置属性时执行以下操作: KafkaConsumer :

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

另一种选择是使用 consumer.seekToBeginning(consumer.assignment()) 但是,除非kafka首先通过让消费者调用poll方法从消费者那里获得心跳信号,否则这是行不通的。所以打电话 poll() ,然后做一个 seekToBeginning() 然后再打电话 poll() 如果你一开始就想要所有的记录。这有点像黑客,但从0.9版本开始,这似乎是最可靠的方法。

// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
gr8qqesn

gr8qqesn8#

另一个选择是保持用户代码简单,并使用命令行工具从外部控制偏移管理 kafka-consumer-groups 那是Kafka带来的。
每次启动消费者之前,您都会

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
 --execute --reset-offsets \
 --group myConsumerGroup \
 --topic myTopic \
 --to-earliest

根据您的需求,您可以使用该工具重置主题的每个分区的偏移量。帮助功能或文档说明了以下选项:

--reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):

--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.
rt4zxlrg

rt4zxlrg9#

  1. https://stackoverflow.com/a/17084401/3821653
  2. http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3ccaog_4qyz2ynh45a8kxb8qw7xw4vdrrwnqmn5j9erfxj8rfkgcg@mail.gmail.com%3e
    要重置使用者组,可以删除zookeeper组id
import kafka.utils.ZkUtils;
 ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`
gg0vcinb

gg0vcinb10#

这适用于0.9.x使用者。基本上,当您创建一个消费者时,您需要使用属性为这个消费者分配一个消费者组id ConsumerConfig.GROUP_ID_CONFIG . 每次启动消费者执行类似操作时,随机生成消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); (properties是将传递给构造函数的java.util.properties的示例 new KafkaConsumer(properties) ).
随机生成客户机意味着在kafka中新的消费者组没有任何与之相关联的偏移量。所以我们接下来要做的就是为这个场景制定一个策略。作为 auto.offset.reset 酒店说:
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果未找到先前的偏移量或使用者的组,则向使用者抛出异常
其他:向消费者抛出异常。
因此,从上面列出的选项中,我们需要选择 earliest 所以新的消费群体每次都是从头开始。
java中的代码如下所示:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);

你现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组,但被分配时,如何生成一个随机id并在这些示例之间分配它,使它们都属于同一个消费者组。
希望有帮助!

相关问题