如何允许spark忽略丢失的输入文件?

waxmsbnn  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(309)

我想在一些生成的包含avro文件的s3路径上运行spark作业(sparkv1.5.1)。我正在给他们加载:

val avros = paths.map(p => sqlContext.read.avro(p))

但有些路径将不存在。我怎样才能让spark忽略那些空路径?以前我使用过这个答案,但是我不知道如何在新的dataframeapi中使用这个答案。
注:我理想的做法是寻找一个类似的链接答案,只是使输入路径可选。我并不特别想显式地检查s3中是否存在路径(因为这很麻烦,可能会使开发变得尴尬),但是如果现在没有干净的方法来实现这一点,我想这就是我的退路。

cbjzeqam

cbjzeqam1#

我会用scala Try 键入以处理读取avro文件目录时出现故障的可能性。通过“try”,我们可以在代码中明确失败的可能性,并以功能性的方式进行处理:

object Main extends App {

  import scala.util.{Success, Try}
  import org.apache.spark.{SparkConf, SparkContext}
  import com.databricks.spark.avro._

  val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example"))
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  //the first path exists, the second one doesn't
  val paths = List("/data/1", "/data/2")

  //Wrap the attempt to read the paths in a Try, then use collect to filter
  //and map with a single partial function.
  val avros =
    paths
      .map(p => Try(sqlContext.read.avro(p)))
      .collect{
        case Success(df) => df
      }
  //Do whatever you want with your list of dataframes
  avros.foreach{ df =>
    println(df.collect())
  }
  sc.stop()
}

相关问题