我使用结构化流+Kafka在我们的项目实时数据分析。我用的是spark 2.2,kafka 0.10.2。
在应用程序启动时从检查点恢复流式查询时,我遇到了一个问题。因为有多个来自单个kafka流点的流查询,并且每个流查询都有不同的checkpint目录。因此,在作业失败的情况下,当我们重新启动作业时,会有一些流式查询无法从检查点位置恢复,因此会引发读取增量文件时出错的异常。以下是日志:
Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most recent failure: Lost task 2.3 in stage 13.0 (TID 831, ip-172-31-10-246.us-west-2.compute.internal, executor 3): java.lang.IllegalStateException: Error reading delta file /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /checkpointing/wifiHealthPerUserPerMinute/state/0/2]: /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta does not exist
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
同样的,请帮帮我。可能有解决这个问题的办法,请建议我,如果有的话,或者可能是它是一个错误。
1条答案
按热度按时间pbgvytdp1#
你的检查站在哪里?这通常是因为您使用本地文件系统来存储检查点。确保您设置了“checkpointlocation”选项,它指向所有节点都可以访问的分布式文件系统(如hdfs)[1]
[1] http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-从检查点失败