spark结构化流媒体中kafkasource的“偏移量从x更改为0”错误是什么?

k7fdbhmy  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(485)

我得到了一个错误“偏移量从x变为0,一些数据可能丢失了”,在一个带有检查点的spark结构化流应用程序中使用kafkasource,但它实际上似乎没有引起任何问题。我在想这个错误到底意味着什么。
我的设置如下。
我让kafka(0.10.1.0)在docker容器中运行,在/tmp/kafka日志中装载了一个命名卷,以便在重启之间保存日志。
我在另一个docker容器中有一个spark structured streaming(2.1.1)应用程序。这些流使用来自Kafka的数据。它们还在重新装入命名卷的位置中使用检查点,以确保元数据在重新启动之间保持不变。
我使用一个实现foreachwriter接口的定制接收器,这意味着我必须实现我自己的已处理版本日志,这样当一切重新启动时,我可以告诉spark streaming不要重新处理已经处理过的内容。
所有这些工作都很好,数据从kafka得到正确的使用,我的自定义接收器正确地处理它。
现在,如果我关闭spark流应用程序,让kafka中的数据堆积起来,然后重新启动spark流,它将抛出以下错误,表明一些数据在kafka中不再可用

ERROR StreamExecution: Query [id = cd2b69e1-2f24-439a-bebc-89e343df83a8, runId = d4b3ae65-8cfa-4713-912c-404623710048] terminated with error

Java.lang.IllegalStateException: Partition input.clientes-0's offset
 was changed from 908 to 0, some data may have been missed.

Some data may have been lost because they are not available in Kafka
 any more; either the data was aged out by Kafka or the topic may have 
 been deleted before all the data in the topic was processed. If you 
 don't want your streaming query to fail on such cases, set the source 
 option "failOnDataLoss" to "false".

at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:452)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:448)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:447)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)

但在抛出错误后,我看到流正常启动。spark streaming正确地将堆积在kafka中的数据推送到我的自定义接收器中,并具有预期的版本。然后,我的接收器继续并正确处理新数据。
因此,这个错误表明一些数据在kafka中不再可用,但它仍然能够通过spark流正确地使用。
如果我重新启动spark流应用程序,即使没有数据被推送到kafka,我也会再次遇到同样的错误。如果我开始向Kafka推送新数据,系统将继续正确处理。
有人知道这里会发生什么吗?我解释错误了吗?

x8goxv8g

x8goxv8g1#

/tmp/kafka-logs 是kafka的日志目录,其中存储了所有偏移量、主题信息。如果它已损坏或某些数据已删除,则需要设置该选项 failOnDataLoss:false 在Kafka你的选择 SparkProcessContext 重新启动spark作业。

Option  : failOnDataLoss
Value   : true or false
Default : TRUE

含义:当数据可能丢失时(例如,主题被删除或偏移量超出范围),是否使查询失败。这可能是虚惊一场。当它不能按预期工作时,可以禁用它。

相关问题