我在centos 7安装了ambari的zookeeper和kafka。
ambari版本:2.1.2.1
zookeeper版本:3.4.6.2.3
Kafka版本:0.8.2.2.3
java Kafkaclient:kafka_2.10, 0.8.2.2
我试图用以下代码保存Kafka偏移:
SimpleConsumer simpleConsumer = new SimpleConsumer(host, port, soTimeout, bufferSize, clientId);
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);
Map<TopicAndPartition, OffsetAndMetadata> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset, "", ErrorMapping.NoError()));
OffsetCommitRequest offsetCommitRequest = new OffsetCommitRequest(groupName, requestInfo, correlationId, clientName, (short)0);
simpleConsumer.commitOffsets(offsetCommitRequest);
simpleConsumer.close();
但当我运行此程序时,我的客户端出现以下错误:
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
在Kafka日志中,我也有以下错误:
[2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:498)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
at kafka.network.Processor.read(SocketServer.scala:547)
at kafka.network.Processor.run(SocketServer.scala:405)
at java.lang.Thread.run(Thread.java:745)
现在我还下载并安装了kafka0.8.2.2的官方版本https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz,工作正常;您可以保存Kafka偏移量而不发生任何错误。
谁能给我一个方向,为什么Kafka不能保存抵消?
p、 s:我知道如果versionid是0(在offsetcommitrequest中),那么偏移量实际上保存在zookeeper中。
暂无答案!
目前还没有任何答案,快来回答吧!