我有一个Kafka主题,我想从最早的事件读。
我想做的是从一个主题(从时间上绝对最早的事件)获取所有数据,直到某个特定日期的事件。
每个事件的结构都有一个名为 dateCliente
我使用它作为筛选事件的阈值。到目前为止,我已经完成了读写。我正在写入一个临时Parquet文件,该文件用作配置单元表的分区。这是工作正常,但是,即使我已经指定了最早在 auto.offset.reset
参数,它不是从一开始就读取数据。
每当我运行我的代码时,我就会得到从这个日期开始的所有事件。每次我再次执行代码时,它都会在上一次代码执行中读取的最后一个事件之后继续读取kafka事件。
我用于配置kafka使用者和订阅主题的代码如下:
// Configurations for kafka consumer
val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
//
// Crea la sesion de Spark
val spark = SparkSession
.builder()
.master("yarn")
.appName("XY")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "XY")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )
但是,每当我从命令行创建一个使用者以便从主题中读取数据时,我都会得到前几天的所有事件。我运行的命令行命令是:
kafka-console-consumer --new-consumer --topic geoevents --from-beginning --bootstrap-server xx.yy.zz.xx
你知道为什么我的代码会这样,而忽略我的代码吗 "earliest"
在 auto.offset.reset
?
3条答案
按热度按时间yizd12fk1#
如果要从所有分区的第一个事件中读取主题,则可以重置偏移量
gjmwrych2#
是因为
auto.offset.reset
仅当组没有提交的偏移量时才应用。请参阅使用者配置文档:
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量,该怎么办
如果要从头开始重新启动,可以:
使用新的组名(例如append)
System.currentTimeMillis()
致集团(anme)使用将使用者的位置显式移动到分区的开头
seekToBeginning()
: http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#seektobeginning-java.util.collection集合-jobtbby33#
财产
auto.offset.reset
仅当kafka中存储的给定使用者没有偏移量时才使用。当您提交记录时,kafka将记录的偏移量存储在一个特殊的主题中,并且在下一次运行中,您的使用者将从上一次提交的偏移量中读取主题。要从头开始读,你应该打电话consumer.seekToBeginning
或使用独特的group.id
财产。