apachespark-检查文件是否存在

yi0zb3m4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(8)|浏览(391)

我是新来的Spark,我有一个问题。我有两个步骤,第一步是将success.txt文件写入hdfs上的某个位置。我的第二步是spark作业,它必须在开始处理数据之前验证success.txt文件是否存在。
我检查了sparkapi,没有找到任何检查文件是否存在的方法。你知道怎么处理吗?
我找到的唯一方法是sc.textfile(hdfs:///success.txt).count(),它会在文件不存在时引发异常。我必须捕获这个异常并相应地编写程序。我真的不喜欢这种方法。希望找到更好的选择。

sycxhyv7

sycxhyv71#

对于java程序员;

SparkConf sparkConf = new SparkConf().setAppName("myClassname");
        SparkContext sparky = new SparkContext(sparkConf);       
        JavaSparkContext context = new JavaSparkContext(sparky);

     FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
            Path path = new Path(sparkConf.get(path_to_File));

            if (!hdfs.exists(path)) {
                 //Path does not exist.
            } 
         else{
               //Path exist.
           }
q9rjltbz

q9rjltbz2#

对于pyspark python用户:
我没有找到任何关于python或pyspark的东西,所以我们需要从python代码执行hdfs命令。这对我有用。
获取文件夹是否存在的hdfs命令:如果为真,则返回0

hdfs dfs -test -d /folder-path

获取文件是否存在的hdfs命令:如果为真,则返回0

hdfs dfs -test -d /folder-path

为了将其放入python代码中,我遵循以下代码行:

import subprocess

def run_cmd(args_list):
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
    proc.communicate()
    return proc.returncode

cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
            code = run_cmd(cmd)
if code == 0:
    print('folder exist')
    print(code)

如果文件夹存在,则输出:
文件夹存在0

6yoyoihd

6yoyoihd3#

对于Pypark:

from py4j.protocol import Py4JJavaError
def path_exist(path):
    try:
        rdd = sc.textFile(path)
        rdd.take(1)
        return True
    except Py4JJavaError as e:
        return False
vyswwuz2

vyswwuz24#

对于hdfs中的文件,可以使用hadoop方法来执行以下操作:

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))
pxiryf3j

pxiryf3j5#

我要说的是,调用这个函数的最好方法是在传统的hadoop文件检查中内部检查文件的存在。

object OutputDirCheck {
  def dirExists(hdfsDirectory: String): Boolean = {
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
    fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
  }
}
b1zrtrql

b1zrtrql6#

对于pyspark,您可以通过以下方式实现这一点,而无需调用子流程:

fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
5t7ly7z5

5t7ly7z57#

使用dbutils:

def path_exists(path):
  try:
    if len(dbutils.fs.ls(path)) > 0:
      return True
  except:
    return False
plupiseo

plupiseo8#

对于spark 2.0或更高版本,可以使用hadoop.fr.filesystem的exist方法
:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

object Test extends App {
  val spark = SparkSession.builder
    .master("local[*]")
    .appName("BigDataETL - Check if file exists")
    .getOrCreate()

  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  // This methods returns Boolean (true - if file exists, false - if file doesn't exist
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")
}

Spark1.6至2.0

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object Test extends App {
  val sparkConf = new SparkConf().setAppName(s"BigDataETL - Check if file exists")
  val sc = new SparkContext(sparkConf)
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")
}

相关问题