在我给出任何细节之前,请注意,我不是在问如何使用kafka-run-class.sh kafka.tools.consumerofsectcher从控制台获取最新偏移量。
我正在尝试使用scala(2.11.8)在spark(2.3.1)中创建一个kafka消费者(kafka版本0.10),它将是容错的。所谓容错,我的意思是,如果由于某种原因,kafka使用者死亡并重新启动,它应该继续使用最后一个偏移量的消息。
为了实现这一点,我使用下面的代码在使用kafka偏移量之后提交它,
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_101",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean), /*because messages successfully polled by the consumer may not yet have resulted in a Spark output operation*/
"session.timeout.ms" -> (30000: java.lang.Integer),
"heartbeat.interval.ms" -> (3000: java.lang.Integer)
)
val topic = Array("topic_1")
val offsets = Map(new org.apache.kafka.common.TopicPartition("kafka_cdc_1", 0) -> 2L) /*Edit: Added code to fetch offset*/
val kstream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topic, kafkaParams, offsets) /*Edit: Added offset*/
)
kstream.foreachRDD{ rdd =>
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if(!rdd.isEmpty()) {
val rawRdd = rdd.map(record =>
(record.key(),record.value())).map(_._2).toDS()
val df = spark.read.schema(tabSchema).json(rawRdd)
df.createOrReplaceTempView("temp_tab")
df.write.insertInto("hive_table")
}
kstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange) /*Doing Async Commit Here */
}
我试过很多方法来获取给定主题的最新偏移量,但都没有成功。
有人能帮我用scala代码来实现这个吗?
edit:在上面的代码中,我尝试使用
val offsets = Map(new org.apache.kafka.common.TopicPartition("kafka_cdc_1", 0) -> 2L) /*Edit: Added code to fetch offset*/
但是上面代码获取的偏移量是0,不是最新的。有没有最新的偏移量?
1条答案
按热度按时间wfveoks01#
找到了解决上述问题的方法。给你。希望它能帮助有需要的人。
语言:scala,spark job