避免基于dir的spark load中的“路径不存在”

qoefvg9y  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(458)

我正在使用通配符从目录加载多个文件,如下所示-

val df: DataFrame = spark.read
          .format("csv")
          .option("delimiter", ",")
          .schema(schema)
          .load(inputPath + "/*.csv*")

这在很大程度上是有效的。但是当inputpath中没有任何csv文件时,我得到-

org.apache.spark.sql.AnalysisException: Path does not exist

有没有一种方法可以避免这个错误,这样我们就可以在有csv文件的情况下加载,而不是在没有要加载的情况下加载错误?

mzmfm0qo

mzmfm0qo1#

你可以把这个放进 try 块和 catch 例外

try {
       val df: DataFrame = spark.read
          .format("csv")
          .option("delimiter", ",")
          .schema(schema)
          .load(inputPath + "/*.csv*")
    }catch (Exception e) {
    print("Do something else here")
    e.getMessage();
  }

或者,如果要检查csv是否存在,可以先检查文件是否存在

import java.nio.file.{Paths, Files}
exist = Files.exists(Paths.get(inputPath +  "/*.csv*"))
if (exist){
 val df: DataFrame = spark.read
          .format("csv")
          .option("delimiter", ",")
          .schema(schema)
          .load(inputPath + "/*.csv*")

 }

如果有多条路径 inputPaths 例如,您可以将它们过滤为

inputPaths.filter(f => Files.exists(Paths.get(f +  "/*.csv*")))

对于hdfs文件系统,可以用
对于单个文件

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path(inputPath +  "/*.csv*"))
if (exist){
     val df: DataFrame = spark.read
              .format("csv")
              .option("delimiter", ",")
              .schema(schema)
              .load(inputPath + "/*.csv*")

     }

对于存储在数组中的多个文件位置。

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)

inputPaths.filter(f => fs.exists(new org.apache.hadoop.fs.Path(f +  "/*.csv*")))

相关问题