我是个新手 Kafka
我们正在使用 Kafka 0.8.1
.
我需要做的是使用来自主题的消息。为此,我必须用java编写一个consumer,它将使用topic中的一条消息,然后将该消息保存到数据库中。保存消息后,一些确认信息将被发送给java使用者。如果确认为true,则应该使用主题中的下一条消息。如果acknowldgement为false(这意味着由于某些错误消息,从主题中读取的内容无法保存到数据库中),则应再次读取该消息。
我想我需要使用 Simple Consumer
,以控制消息偏移量,并已完成此链接中给出的简单消费者示例https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example.
在本例中,偏移量在run方法中计算为' readOffset
'. 我需要玩这个吗?例如,我可以使用 LatestTime()
而不是 EarliestTime()
如果为false,我会在使用之前将偏移量重置为1 offset - 1
.
我应该这样做吗?
1条答案
按热度按时间b0zn9rqh1#
我认为你可以使用高级消费者(http://kafka.apache.org/documentation.html#highlevelconsumerapi),这应该比simpleconsumer更易于使用。我不认为使用者需要在数据库出现故障时重新读取来自kafka的消息,因为使用者已经有了这些消息,可以将它们重新发送到db或执行任何它认为合适的操作。
高级使用者将从特定分区读取的最后一个偏移量存储在zookeeper中(基于使用者组名称),以便当使用者进程死亡并稍后重新启动(可能在其他主机上)时,它可以继续处理其停止的消息。可以定期将此偏移量自动保存到zookeeper(请参阅使用者属性auto.commit.enable和auto.commit.interval.ms),或者通过调用
ConsumerConnector.commitOffsets
. 另请参见https://cwiki.apache.org/confluence/display/kafka/consumer+group+example .我建议您关闭自动提交,并在收到db确认后自行提交偏移量。因此,您可以确保在使用者失败的情况下从kafka重新读取未处理的消息,并且提交给kafka的所有消息最终都将至少到达db一次(但不是“恰好一次”)。