无法在zookeeper中保存Kafka偏移量

cczfrluj  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(197)

我在centos 7安装了ambari的zookeeper和kafka。
ambari版本:2.1.2.1
zookeeper版本:3.4.6.2.3
Kafka版本:0.8.2.2.3
java Kafkaclient:kafka_2.10, 0.8.2.2
我试图用以下代码保存Kafka偏移:

SimpleConsumer simpleConsumer = new SimpleConsumer(host, port, soTimeout, bufferSize, clientId);
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);
Map<TopicAndPartition, OffsetAndMetadata> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset, "", ErrorMapping.NoError()));
OffsetCommitRequest offsetCommitRequest = new OffsetCommitRequest(groupName, requestInfo, correlationId, clientName, (short)0);
simpleConsumer.commitOffsets(offsetCommitRequest);
simpleConsumer.close();

但当我运行此程序时,我的客户端出现以下错误:

java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.

在Kafka日志中,我也有以下错误:

[2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
    at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
    at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
    at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
    at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
    at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
    at kafka.network.Processor.read(SocketServer.scala:547)
    at kafka.network.Processor.run(SocketServer.scala:405)
    at java.lang.Thread.run(Thread.java:745)

现在我还下载并安装了kafka0.8.2.2的官方版本https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz,工作正常;您可以保存Kafka偏移量而不发生任何错误。
谁能给我一个方向,为什么Kafka不能保存抵消?
p、 s:我知道如果versionid是0(在offsetcommitrequest中),那么偏移量实际上保存在zookeeper中。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题