spark流检查点

kx5bkwkv  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(477)

我正在用spark kafka直接流媒体阅读来自kafka的信息。我想实现零消息丢失,重启spark后,它必须读取kafka丢失的消息。我正在使用checkpoint保存所有的读取偏移量,以便下次spark将从存储的偏移量开始读取。这是我的理解。
我使用了下面的代码。我停止了我的Spark,给Kafka发了几条信息。重新启动spark后,它不会读取来自kafka的遗漏消息。spark阅读Kafka的最新消息。如何阅读Kafka错过的信息?

val ssc = new StreamingContext(spark.sparkContext, Milliseconds(6000))
ssc.checkpoint("C:/cp")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val msgStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

注意:应用程序日志显示auto.offset.reset为none,而不是latest。为什么?

WARN KafkaUtils: overriding auto.offset.reset to none for executor

sbt公司

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

窗口:7

1wnzp6jl

1wnzp6jl1#

我建议不要依赖于检查点,相反,您可以使用外部数据存储来保存已处理的kafka消息偏移量。请按照链接获取一些信息。https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

mjqavswn

mjqavswn2#

如果要读取丢失的消息,请尝试提交过程而不是检查点。
请理解,spark无法读取具有以下属性的旧邮件:

"auto.offset.reset" -> "latest"

试试这个:

val kafkaParams = Map[String, Object](
 //...
 "auto.offset.reset" -> "earliest",
 "enable.auto.commit" -> (false: java.lang.Boolean)
 //...
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  //Your processing goes here

  //Then commit after completing your process.
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

希望这有帮助。

相关问题