为什么kafka consumer忽略auto.offset.reset参数中我的“最早”指令,从而不从绝对第一个事件中读取我的主题?

tct7dpnv  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(426)

我有一个Kafka主题,我想从最早的事件读。
我想做的是从一个主题(从时间上绝对最早的事件)获取所有数据,直到某个特定日期的事件。
每个事件的结构都有一个名为 dateCliente 我使用它作为筛选事件的阈值。到目前为止,我已经完成了读写。我正在写入一个临时Parquet文件,该文件用作配置单元表的分区。这是工作正常,但是,即使我已经指定了最早在 auto.offset.reset 参数,它不是从一开始就读取数据。
每当我运行我的代码时,我就会得到从这个日期开始的所有事件。每次我再次执行代码时,它都会在上一次代码执行中读取的最后一个事件之后继续读取kafka事件。
我用于配置kafka使用者和订阅主题的代码如下:

// Configurations for kafka consumer
  val conf = ConfigFactory.parseResources("properties.conf")
  val brokersip = conf.getString("enrichment.brokers.value")
  val topics_in = conf.getString("enrichment.topics_in.value")
  //

  // Crea la sesion de Spark
  val spark = SparkSession
    .builder()
    .master("yarn")
    .appName("XY")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val properties = new Properties
  properties.put("key.deserializer", classOf[StringDeserializer])
  properties.put("value.deserializer", classOf[StringDeserializer])
  properties.put("bootstrap.servers", brokersip)
  properties.put("auto.offset.reset", "earliest")
  properties.put("group.id", "XY")

  val consumer = new KafkaConsumer[String, String](properties)
  consumer.subscribe( util.Collections.singletonList("geoevents") )

但是,每当我从命令行创建一个使用者以便从主题中读取数据时,我都会得到前几天的所有事件。我运行的命令行命令是:

kafka-console-consumer --new-consumer --topic geoevents --from-beginning --bootstrap-server xx.yy.zz.xx

你知道为什么我的代码会这样,而忽略我的代码吗 "earliest"auto.offset.reset ?

yizd12fk

yizd12fk1#

如果要从所有分区的第一个事件中读取主题,则可以重置偏移量

kafka-consumer-groups --bootstrap-server <host-ip>:<port> --group <group-name> --reset-offsets --execute --to-earliest --topic <topic>
gjmwrych

gjmwrych2#

是因为 auto.offset.reset 仅当组没有提交的偏移量时才应用。
请参阅使用者配置文档:
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量,该怎么办
如果要从头开始重新启动,可以:
使用新的组名(例如append) System.currentTimeMillis() 致集团(anme)
使用将使用者的位置显式移动到分区的开头 seekToBeginning() : http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#seektobeginning-java.util.collection集合-

jobtbby3

jobtbby33#

财产 auto.offset.reset 仅当kafka中存储的给定使用者没有偏移量时才使用。当您提交记录时,kafka将记录的偏移量存储在一个特殊的主题中,并且在下一次运行中,您的使用者将从上一次提交的偏移量中读取主题。要从头开始读,你应该打电话 consumer.seekToBeginning 或使用独特的 group.id 财产。

相关问题