apachekafka中的java读取消息偏移量

cgfeq70w  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(402)

我是个新手 Kafka 我们正在使用 Kafka 0.8.1 .
我需要做的是使用来自主题的消息。为此,我必须用java编写一个consumer,它将使用topic中的一条消息,然后将该消息保存到数据库中。保存消息后,一些确认信息将被发送给java使用者。如果确认为true,则应该使用主题中的下一条消息。如果acknowldgement为false(这意味着由于某些错误消息,从主题中读取的内容无法保存到数据库中),则应再次读取该消息。
我想我需要使用 Simple Consumer ,以控制消息偏移量,并已完成此链接中给出的简单消费者示例https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example.
在本例中,偏移量在run方法中计算为' readOffset '. 我需要玩这个吗?例如,我可以使用 LatestTime() 而不是 EarliestTime() 如果为false,我会在使用之前将偏移量重置为1 offset - 1 .
我应该这样做吗?

b0zn9rqh

b0zn9rqh1#

我认为你可以使用高级消费者(http://kafka.apache.org/documentation.html#highlevelconsumerapi),这应该比simpleconsumer更易于使用。我不认为使用者需要在数据库出现故障时重新读取来自kafka的消息,因为使用者已经有了这些消息,可以将它们重新发送到db或执行任何它认为合适的操作。
高级使用者将从特定分区读取的最后一个偏移量存储在zookeeper中(基于使用者组名称),以便当使用者进程死亡并稍后重新启动(可能在其他主机上)时,它可以继续处理其停止的消息。可以定期将此偏移量自动保存到zookeeper(请参阅使用者属性auto.commit.enable和auto.commit.interval.ms),或者通过调用 ConsumerConnector.commitOffsets . 另请参见https://cwiki.apache.org/confluence/display/kafka/consumer+group+example .
我建议您关闭自动提交,并在收到db确认后自行提交偏移量。因此,您可以确保在使用者失败的情况下从kafka重新读取未处理的消息,并且提交给kafka的所有消息最终都将至少到达db一次(但不是“恰好一次”)。

相关问题