consumer.poll(0); // without this the below statement never got any records
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
// ... Assuming the props have been set properly.
// ... enable.auto.commit and auto.offset.reset as default
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(0); // without this, the assignment will be empty.
consumer.assignment().forEach(t -> {
System.out.printf("Set %s to offset 0%n", t.toString());
consumer.seek(t, 0);
});
while (true) {
// ... consumer polls messages as usual.
}
// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
--reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):
--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.
10条答案
按热度按时间d4so4syb1#
一种可能的解决方案是在订阅一个或多个主题时使用ConsumerBalanceListener的实现。ConsumerBalanceListener包含在分配新分区或从使用者中删除新分区时的回调方法。下面的代码示例说明了这一点:
}
现在,无论何时将分区分配给使用者,都将从头开始读取每个分区。
zmeyuzjn2#
如果您更具体地使用java consumer api org.apache.kafka.clients.consumer.consumer,可以尝试seek*方法。
这里,consumer.assignment()返回分配给给定使用者的所有分区,seekToBegining将从给定分区集合的最早偏移量开始。
cwdobuhd3#
在使用高级使用者集时
props.put("auto.offset.reset", "smallest");
在创造ConsumerConfig
mbjcgjjk4#
所以对我来说,有效的是上面建议的结合。关键的变化是包括
每次都有一个随机生成的组id。但这一点对我不起作用。出于某种原因,我第一次调查消费者时,他们没有任何记录。我必须破解它才能让它工作-
我是新来Kafka,不知道为什么会发生这种情况,但对于其他人仍然试图让这项工作,希望这有帮助。
hgtggwj05#
如果您只是避免保存任何补偿,消费者将始终在开始时重置。
0pizxfdo6#
总是从偏移量0读取而不每次创建新的groupid。
ycggw6v27#
这样做的一个选择是每次启动时都有一个唯一的组id,这意味着kafka会从一开始就向您发送主题中的消息。在设置属性时执行以下操作:
KafkaConsumer
:另一种选择是使用
consumer.seekToBeginning(consumer.assignment())
但是,除非kafka首先通过让消费者调用poll方法从消费者那里获得心跳信号,否则这是行不通的。所以打电话poll()
,然后做一个seekToBeginning()
然后再打电话poll()
如果你一开始就想要所有的记录。这有点像黑客,但从0.9版本开始,这似乎是最可靠的方法。gr8qqesn8#
另一个选择是保持用户代码简单,并使用命令行工具从外部控制偏移管理
kafka-consumer-groups
那是Kafka带来的。每次启动消费者之前,您都会
根据您的需求,您可以使用该工具重置主题的每个分区的偏移量。帮助功能或文档说明了以下选项:
rt4zxlrg9#
要重置使用者组,可以删除zookeeper组id
gg0vcinb10#
这适用于0.9.x使用者。基本上,当您创建一个消费者时,您需要使用属性为这个消费者分配一个消费者组id
ConsumerConfig.GROUP_ID_CONFIG
. 每次启动消费者执行类似操作时,随机生成消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
(properties是将传递给构造函数的java.util.properties的示例new KafkaConsumer(properties)
).随机生成客户机意味着在kafka中新的消费者组没有任何与之相关联的偏移量。所以我们接下来要做的就是为这个场景制定一个策略。作为
auto.offset.reset
酒店说:如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果未找到先前的偏移量或使用者的组,则向使用者抛出异常
其他:向消费者抛出异常。
因此,从上面列出的选项中,我们需要选择
earliest
所以新的消费群体每次都是从头开始。java中的代码如下所示:
你现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组,但被分配时,如何生成一个随机id并在这些示例之间分配它,使它们都属于同一个消费者组。
希望有帮助!