我正在与Kafka0.8和zookeeper 3.3.5合作。事实上,我们有一打主题,我们正在消费没有任何问题。
最近,我们开始喂养和消费一个行为怪异的新主题。消耗的偏移量突然重置。它尊重我们设置的auto.offset.reset策略(实际上是最小的),但是我不明白为什么这个主题突然重置了它的偏移量。
我用的是高级消费者。
以下是我发现的一些错误日志:我们有一堆错误日志:
[2015-03-26 05:21:17,789] INFO Fetching metadata from broker id:1,host:172.16.23.1,port:9092 with correlation id 47 for 1 topic(s) Set(MyTopic) (kafka.cl
ient.ClientUtils$)
[2015-03-26 05:21:17,789] ERROR Producer connection to 172.16.23.1:9092 unsuccessful (kafka.producer.SyncProducer)
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
每次发生此问题时,我都会看到警告日志:
[2015-03-26 05:21:30,596] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
然后真正的问题发生了:
[2015-03-26 05:21:47,551] INFO Connected to 172.16.23.5:9092 for producing (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,552] INFO Disconnecting from 172.16.23.5:9092 (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherManager-1427047649942] Added fetcher for partitions ArrayBuffer([[MyTopic,0], initOffset 45268422051 to br
oker id:5,host:172.16.23.5,port:9092] ) (kafka.consumer.ConsumerFetcherManager)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Starting (kafka.consumer.Cons
umerFetcherThread)
[2015-03-26 05:21:50,388] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,490] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,591] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,692] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
现在的问题是:是否有人已经经历过这种行为?有没有人能告诉我Kafka什么时候决定重置它的偏移量,auto.offset.reset是最大的还是最小的?
谢谢您。
1条答案
按热度按时间yqkkidmi1#
现在的情况是你在一段时间内把主题展开得太慢了。
kafka有一个保留模型,它不是基于消费者是否获得了数据,而是基于磁盘使用情况和/或周期。在某个时候,你太晚了,你需要的下一条消息已经被删除,并且不再可用,因为Kafka已经清理了数据。因此
Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612
信息。然后,在您的情况下,您的消费者将您的重置策略再次应用于引导本身。
当您有突发的工作流时,这是一个常见的问题,有时会超出数据保留范围。它可能消失了,因为你提高了你的开卷速度,或增加了你的保留策略,以便能够生存的爆发。