kafkaconsumer不读取偏移量0

jhkqcmku  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(346)

我想测试一个Kafka的例子。我使用的是Kafka0.10.0.1制作人:

object ProducerApp extends App {

val topic = "topicTest"
val  props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{    
val record = new ProducerRecord(topic, "key "+i," value "+i)    
producer.send(record)    
Thread.sleep(100)    
}
}

使用者(主题“topictest”由1个分区创建):

object ConsumerApp extends App {
val topic = "topicTest"  
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")  
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")  
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)  
consumer.subscribe(scala.List(topic).asJava)    
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records:ConsumerRecords[String,String] = consumer.poll(20000)
println("records size "+records.count())
records.asScala.foreach(rec => println("offset "+rec.offset()))    
}  
}

问题是,使用者在第一次迭代时并没有从偏移量0读取数据,而是在其他迭代时读取数据。我想知道原因,以及如何让使用者在所有迭代中读取偏移量0。预期结果是:

records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
...

但得到的结果是:

records size 4
offset 2
offset 3
offset 4
offset 5
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
...
envsm3lx

envsm3lx1#

我不知道到底是什么错误,我写的代码和你的一样。但对我来说效果不错。如果你想你可以使用下面的代码段。

import java.util
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.LongDeserializer;
import scala.collection.JavaConverters._
import java.util.Properties

object ConsumerExample extends App {

  val TOPIC = "test-stack"

  val props = new Properties()

  props.put("bootstrap.servers", "localhost:9092")

  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("group.id", "testinf")
  props.put("auto.offset.reset", "earliest")
  props.put("auto.offset.reset.config", "false")

  var listener = new ConsumerRebalanceListener() {
    override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
      println("Assignment : " + partitions)

    }
    override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
      // do nothing
    }

  }

  val consumer = new KafkaConsumer[String, String](props)

  consumer.subscribe(util.Collections.singletonList(TOPIC), listener)

  while (true) {

    consumer.seekToBeginning(consumer.assignment())
    val records = consumer.poll(20000)
    // for (record <- records.asScala) {
    //   println(record)
    // }
    println("records size "+records.count())
    records.asScala.foreach(rec => println("offset "+rec.offset()))  
  }

}

试试看,让我知道。如果你有任何问题。

相关问题